diff options
author | Rupert Smith <rupertlssmith@apache.org> | 2007-07-17 16:22:16 +0000 |
---|---|---|
committer | Rupert Smith <rupertlssmith@apache.org> | 2007-07-17 16:22:16 +0000 |
commit | d96489e6a64861e943a9adffadc5bbcf18632069 (patch) | |
tree | 51485fc92091c4c3a92f0d84effc9b9f5991b5c2 | |
parent | e061b189402d96fbcaeb792c714461be21cd057f (diff) | |
download | qpid-python-d96489e6a64861e943a9adffadc5bbcf18632069.tar.gz |
Refactored the distributed test clients and coordinator to support different distribution and sequencing engines.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@556958 13f79535-47bb-0310-9956-ffa450edef68
26 files changed, 2078 insertions, 2166 deletions
diff --git a/java/integrationtests/docs/RunningSustainedTests.txt b/java/integrationtests/docs/RunningSustainedTests.txt index 2b37f4c5a7..c483709f61 100644 --- a/java/integrationtests/docs/RunningSustainedTests.txt +++ b/java/integrationtests/docs/RunningSustainedTests.txt @@ -1,15 +1,17 @@ In addition to the integration tests the framework provided by this package also allows for sustained tests to be run. Currently avaible tests: -- org.apache.qpid.sustained.SustainedTestClient : Pub Sub test to determine steady state throughput. +- org.apache.qpid.sustained.SustainedClientTestCase : Pub Sub test to determine steady state throughput. Running Tests. Run the tests as per the integration tests. - - Start a broker - - Start at least one Client [java org.apache.qpid.sustained.TestClient], ensuring unique naming - - Start Test Controller [java org.apache.qpid.sustained.TestCoordinator] - - Additional Test clients can be started: - [java org.apache.qpid.sustained.TestClient -j org.apache.qpid.sustained.SustainedTestClient] +- Start a broker +- Start at least one test client [java org.apache.qpid.interop.TestClient], ensuring unique naming. + +- Start the test coordinator with the 'fanout' engine, on the sustained test case [java org.apache.qpid.interop.coordinator.Coordinator] + +- Additional Test clients can be started and joined into the running test: [java org.apache.qpid.interop.TestClient -j] + diff --git a/java/integrationtests/pom.xml b/java/integrationtests/pom.xml index 9ccd153f54..89fd5ede28 100644 --- a/java/integrationtests/pom.xml +++ b/java/integrationtests/pom.xml @@ -40,12 +40,16 @@ <dependencies>
- <!-- These tests depend on the client API only. -->
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-client</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-systests</artifactId>
+ </dependency>
+
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/Coordinator.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/Coordinator.java index 0eb6be3a91..c6efe05c61 100644 --- a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/Coordinator.java +++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/Coordinator.java @@ -20,31 +20,33 @@ */
package org.apache.qpid.interop.coordinator;
-import java.io.*;
-import java.util.*;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import javax.jms.*;
-
import junit.framework.Test;
import junit.framework.TestResult;
import junit.framework.TestSuite;
import org.apache.log4j.Logger;
-import org.apache.qpid.interop.coordinator.testcases.CoordinatingTestCase1DummyRun;
-import org.apache.qpid.interop.coordinator.testcases.CoordinatingTestCase2BasicP2P;
-import org.apache.qpid.interop.coordinator.testcases.CoordinatingTestCase3BasicPubSub;
-import org.apache.qpid.interop.testclient.TestClient;
-import org.apache.qpid.util.CommandLineParser;
+import org.apache.qpid.interop.coordinator.testcases.InteropTestCase1DummyRun;
+import org.apache.qpid.interop.coordinator.testcases.InteropTestCase2BasicP2P;
+import org.apache.qpid.interop.coordinator.testcases.InteropTestCase3BasicPubSub;
+import org.apache.qpid.test.framework.MessagingTestConfigProperties;
+import org.apache.qpid.test.framework.TestUtils;
import org.apache.qpid.util.ConversationFactory;
import org.apache.qpid.util.PrettyPrintingUtils;
import uk.co.thebadgerset.junit.extensions.TKTestResult;
import uk.co.thebadgerset.junit.extensions.TKTestRunner;
import uk.co.thebadgerset.junit.extensions.WrappedSuiteTestDecorator;
+import uk.co.thebadgerset.junit.extensions.util.CommandLineParser;
+import uk.co.thebadgerset.junit.extensions.util.ParsedProperties;
import uk.co.thebadgerset.junit.extensions.util.TestContextProperties;
+import javax.jms.*;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.LinkedBlockingQueue;
+
/**
* <p/>Implements the coordinator client described in the interop testing specification
* (http://cwiki.apache.org/confluence/display/qpid/Interop+Testing+Specification). This coordinator is built on
@@ -57,12 +59,33 @@ import uk.co.thebadgerset.junit.extensions.util.TestContextProperties; * <tr><td> Attach XML test result logger.
* <tr><td> Terminate the interop testing framework.
* </table>
+ *
+ * @todo The test result is ignored, because it only contains the failures for the last test run. Shoud accumulate
+ * failures over all tests, and return with success or fail code based on all results.
+ *
+ * @todo Remove hard coding of test cases and put on command line instead.
*/
public class Coordinator extends TKTestRunner
{
+ /** Used for debugging. */
private static final Logger log = Logger.getLogger(Coordinator.class);
- public static final String DEFAULT_CONNECTION_PROPS_RESOURCE = "org/apache/qpid/interop/connection.properties";
+ /** Defines the possible distributed test engines available to run coordinated test cases with. */
+ public enum TestEngine
+ {
+ /** Specifies the interop test engine. This tests all available clients in pairs. */
+ INTEROP,
+
+ /** Specifies the fanout test engine. This sets up one publisher role, and many reciever roles. */
+ FANOUT
+ }
+
+ /**
+ * Holds the test context properties that provides the default test parameters, plus command line overrides.
+ * This is initialized with the default test parameters, to which command line overrides may be applied.
+ */
+ protected static ParsedProperties testContextProperties =
+ TestContextProperties.getInstance(MessagingTestConfigProperties.defaults);
/** Holds the URL of the broker to coordinate the tests on. */
protected String brokerUrl;
@@ -86,21 +109,28 @@ public class Coordinator extends TKTestRunner protected String currentTestClassName;
/** Holds the path of the directory to output test results too, if one is defined. */
- protected static String _reportDir;
+ protected String reportDir;
+
+ /** Holds the coordinating test engine type to run the tests through. */
+ protected TestEngine engine;
/**
* Creates an interop test coordinator on the specified broker and virtual host.
*
* @param brokerUrl The URL of the broker to connect to.
* @param virtualHost The virtual host to run all tests on. Optional, may be <tt>null</tt>.
+ * @param reportDir The directory to write out test results to.
+ * @param engine The distributed test engine type to run the tests with.
*/
- public Coordinator(String brokerUrl, String virtualHost)
+ public Coordinator(String brokerUrl, String virtualHost, String reportDir, TestEngine engine)
{
log.debug("Coordinator(String brokerUrl = " + brokerUrl + ", String virtualHost = " + virtualHost + "): called");
// Retain the connection parameters.
this.brokerUrl = brokerUrl;
this.virtualHost = virtualHost;
+ this.reportDir = reportDir;
+ this.engine = engine;
}
/**
@@ -116,59 +146,69 @@ public class Coordinator extends TKTestRunner */
public static void main(String[] args)
{
- try
+ // Override the default broker url to be localhost:5672.
+ testContextProperties.setProperty(MessagingTestConfigProperties.BROKER_PROPNAME, "tcp://localhost:5672");
+
+ // Use the command line parser to evaluate the command line with standard handling behaviour (print errors
+ // and usage then exist if there are errors).
+ // Any options and trailing name=value pairs are also injected into the test context properties object,
+ // to override any defaults that may have been set up.
+ Properties options =
+ CommandLineParser.processCommandLine(args,
+ new CommandLineParser(
+ new String[][]
+ {
+ { "b", "The broker URL.", "broker", "false" },
+ { "h", "The virtual host to use.", "virtual host", "false" },
+ { "o", "The name of the directory to output test timings to.", "dir", "false" },
+ {
+ "e", "The test execution engine to use. Default is interop.", "engine", "interop",
+ "^interop$|^fanout$", "true"
+ }
+ }), testContextProperties);
+
+ // Extract the command line options.
+ String brokerUrl = options.getProperty("b");
+ String virtualHost = options.getProperty("h");
+ String reportDir = options.getProperty("o");
+ String testEngine = options.getProperty("e");
+ TestEngine engine = "fanout".equals(testEngine) ? TestEngine.FANOUT : TestEngine.INTEROP;
+ reportDir = (reportDir == null) ? "." : reportDir;
+
+ // If broker or virtual host settings were specified as command line options, override the defaults in the
+ // test context properties with them.
+
+ // Scan for available test cases using a classpath scanner.
+ // Hard code the test classes till the classpath scanner is fixed.
+ Collection<Class<? extends InteropTestCase>> testCaseClasses = new ArrayList<Class<? extends InteropTestCase>>();
+ // ClasspathScanner.getMatches(InteropTestCase.class, "^Test.*", true);
+ Collections.addAll(testCaseClasses, InteropTestCase1DummyRun.class, InteropTestCase2BasicP2P.class,
+ InteropTestCase3BasicPubSub.class);
+
+ // Check that some test classes were actually found.
+ if (testCaseClasses.isEmpty())
{
- // Use the command line parser to evaluate the command line with standard handling behaviour (print errors
- // and usage then exist if there are errors).
- Properties options =
- CommandLineParser.processCommandLine(args,
- new CommandLineParser(
- new String[][]
- {
- {"b", "The broker URL.", "broker", "false"},
- {"h", "The virtual host to use.", "virtual host", "false"},
- {"o", "The name of the directory to output test timings to.", "dir", "false"}
- }));
-
- // Extract the command line options.
- String brokerUrl = options.getProperty("b");
- String virtualHost = options.getProperty("h");
- _reportDir = options.getProperty("o");
- _reportDir = (_reportDir == null) ? "." : _reportDir;
-
- // Scan for available test cases using a classpath scanner.
- Collection<Class<? extends CoordinatingTestCase>> testCaseClasses =
- new ArrayList<Class<? extends CoordinatingTestCase>>();
- // ClasspathScanner.getMatches(CoordinatingTestCase.class, "^Test.*", true);
- // Hard code the test classes till the classpath scanner is fixed.
- Collections.addAll(testCaseClasses,
- CoordinatingTestCase1DummyRun.class,
- CoordinatingTestCase2BasicP2P.class,
- CoordinatingTestCase3BasicPubSub.class);
-
- // Check that some test classes were actually found.
- if (testCaseClasses.isEmpty())
- {
- throw new RuntimeException(
- "No test classes implementing CoordinatingTestCase were found on the class path.");
- }
-
- int i = 0;
- String[] testClassNames = new String[testCaseClasses.size()];
+ throw new RuntimeException("No test classes implementing InteropTestCase were found on the class path.");
+ }
- for (Class testClass : testCaseClasses)
- {
- testClassNames[i++] = testClass.getName();
- }
+ // Extract the names of all the test classes, to pass to the start method.
+ int i = 0;
+ String[] testClassNames = new String[testCaseClasses.size()];
- // Create a coordinator and begin its test procedure.
- Coordinator coordinator = new Coordinator(brokerUrl, virtualHost);
+ for (Class testClass : testCaseClasses)
+ {
+ testClassNames[i++] = testClass.getName();
+ }
- boolean failure = false;
+ // Create a coordinator and begin its test procedure.
+ Coordinator coordinator = new Coordinator(brokerUrl, virtualHost, reportDir, engine);
+ try
+ {
TestResult testResult = coordinator.start(testClassNames);
- if (failure)
+ // Return different error codes, depending on whether or not there were test failures.
+ if (testResult.failureCount() > 0)
{
System.exit(FAILURE_EXIT);
}
@@ -186,7 +226,7 @@ public class Coordinator extends TKTestRunner }
/**
- * Starts all of the test classes to be run by this coordinator running.
+ * Starts all of the test classes to be run by this coordinator.
*
* @param testClassNames An array of all the coordinating test case implementations.
*
@@ -197,10 +237,10 @@ public class Coordinator extends TKTestRunner public TestResult start(String[] testClassNames) throws Exception
{
log.debug("public TestResult start(String[] testClassNames = " + PrettyPrintingUtils.printArray(testClassNames)
- + ": called");
+ + ": called");
// Connect to the broker.
- connection = TestClient.createConnection(DEFAULT_CONNECTION_PROPS_RESOURCE, brokerUrl, virtualHost);
+ connection = TestUtils.createConnection(TestContextProperties.getInstance());
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination controlTopic = session.createTopic("iop.control");
@@ -231,7 +271,7 @@ public class Coordinator extends TKTestRunner // Record the current test class, so that the test results can be output to a file incorporating this name.
this.currentTestClassName = testClassName;
- result = super.start(new String[]{testClassName});
+ result = super.start(new String[] { testClassName });
}
// At this point in time, all tests have completed. Broadcast the shutdown message.
@@ -255,7 +295,7 @@ public class Coordinator extends TKTestRunner public static Set<TestClientDetails> extractEnlists(Collection<Message> enlists) throws JMSException
{
log.debug("public static Set<TestClientDetails> extractEnlists(Collection<Message> enlists = " + enlists
- + "): called");
+ + "): called");
Set<TestClientDetails> enlistedClients = new HashSet<TestClientDetails>();
@@ -304,17 +344,17 @@ public class Coordinator extends TKTestRunner Test nextTest = suite.testAt(i);
log.debug("suite.testAt(" + i + ") = " + nextTest);
- if (nextTest instanceof CoordinatingTestCase)
+ if (nextTest instanceof InteropTestCase)
{
- log.debug("nextTest is a CoordinatingTestCase");
+ log.debug("nextTest is a InteropTestCase");
}
}
targetTest = new WrappedSuiteTestDecorator(suite);
log.debug("Wrapped with a WrappedSuiteTestDecorator.");
}
- // Wrap the tests in an inviting test decorator, to perform the invite/test cycle.
+ // Wrap the tests in a suitable distributed test decorator, to perform the invite/test cycle.
targetTest = newTestDecorator(targetTest, enlistedClients, conversationFactory, connection);
TestSuite suite = new TestSuite();
@@ -326,9 +366,28 @@ public class Coordinator extends TKTestRunner return super.doRun(suite, wait);
}
- protected WrappedSuiteTestDecorator newTestDecorator(WrappedSuiteTestDecorator targetTest, Set<TestClientDetails> enlistedClients, ConversationFactory conversationFactory, Connection connection)
+ /**
+ * Creates a wrapped test decorator, that is capable of inviting enlisted clients to participate in a specified
+ * test. This is the test engine that sets up the roles and sequences a distributed test case.
+ *
+ * @param targetTest The test decorator to wrap.
+ * @param enlistedClients The enlisted clients available to run the test.
+ * @param conversationFactory The conversation factory used to build conversation helper over the specified connection.
+ * @param connection The connection to talk to the enlisted clients over.
+ *
+ * @return An invititing test decorator, that invites all the enlisted clients to participate in tests, in pairs.
+ */
+ protected InvitingTestDecorator newTestDecorator(WrappedSuiteTestDecorator targetTest,
+ Set<TestClientDetails> enlistedClients, ConversationFactory conversationFactory, Connection connection)
{
- return new InvitingTestDecorator(targetTest, enlistedClients, conversationFactory, connection);
+ switch (engine)
+ {
+ case FANOUT:
+ return new FanOutTestDecorator(targetTest, enlistedClients, conversationFactory, connection);
+ case INTEROP:
+ default:
+ return new InteropTestDecorator(targetTest, enlistedClients, conversationFactory, connection);
+ }
}
/**
@@ -343,18 +402,18 @@ public class Coordinator extends TKTestRunner TKTestResult result = new TKTestResult(fPrinter.getWriter(), delay, verbose, testCaseName);
// Check if a directory to output reports to has been specified and attach test listeners if so.
- if (_reportDir != null)
+ if (reportDir != null)
{
// Create the report directory if it does not already exist.
- File reportDirFile = new File(_reportDir);
+ File reportDirFile = new File(reportDir);
if (!reportDirFile.exists())
{
reportDirFile.mkdir();
}
- // Create the timings file (make the name of this configurable as a command line parameter).
- Writer timingsWriter = null;
+ // Create the results file (make the name of this configurable as a command line parameter).
+ Writer timingsWriter;
try
{
@@ -366,7 +425,7 @@ public class Coordinator extends TKTestRunner throw new RuntimeException("Unable to create the log file to write test results to: " + e, e);
}
- // Set up a CSV results listener to output the timings to the results file.
+ // Set up an XML results listener to output the timings to the results file.
XMLTestListener listener = new XMLTestListener(timingsWriter, currentTestClassName);
result.addListener(listener);
result.addTKTestListener(listener);
@@ -385,9 +444,4 @@ public class Coordinator extends TKTestRunner return result;
}
-
- public void setReportDir(String reportDir)
- {
- _reportDir = reportDir;
- }
}
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/DropInTest.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/DropInTest.java new file mode 100644 index 0000000000..f7e38fb1ad --- /dev/null +++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/DropInTest.java @@ -0,0 +1,51 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.interop.coordinator;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+
+/**
+ * A DropIn test is a test case that can accept late joining test clients into a running test. This can be usefull,
+ * for interactive experimentation.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities
+ * <tr><td> Accept late joining test clients.
+ * </table>
+ */
+public interface DropInTest
+{
+ /**
+ * Should accept a late joining client into a running test case. The client will be enlisted with a control message
+ * with the 'CONTROL_TYPE' field set to the value 'LATEJOIN'. It should also provide values for the fields:
+ *
+ * <p/><table>
+ * <tr><td> CLIENT_NAME <td> A unique name for the new client.
+ * <tr><td> CLIENT_PRIVATE_CONTROL_KEY <td> The key for the route on which the client receives its control messages.
+ * </table>
+ *
+ * @param message The late joiners join message.
+ *
+ * @throws JMSException Any JMS Exception are allowed to fall through, indicating that the join failed.
+ */
+ public void lateJoin(Message message) throws JMSException;
+}
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/FanOutTestCase.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/FanOutTestCase.java new file mode 100644 index 0000000000..ba737dffab --- /dev/null +++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/FanOutTestCase.java @@ -0,0 +1,179 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.interop.coordinator;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.test.framework.TestUtils;
+import org.apache.qpid.util.ConversationFactory;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Session;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * FanOutTestCase is a {@link org.apache.qpid.interop.coordinator.InteropTestCase} across one sending client and
+ * zero or more receiving clients. Its main purpose is to coordinate the setting up of one test client in the sending
+ * role and the remainder in the receiving role.
+ *
+ * <p><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Accept notification of test case participants.
+ * <td> {@link org.apache.qpid.interop.coordinator.InvitingTestDecorator}
+ * <tr><td> Accept JMS Connection to carry out the coordination over.
+ * <tr><td> Coordinate the test sequence amongst participants. <td> {@link ConversationFactory}
+ * <tr><td> Supply test properties
+ * </table>
+ *
+ * @todo Gather all the receivers reports.
+ */
+public abstract class FanOutTestCase extends InteropTestCase
+{
+ /** Used for debugging. */
+ private static final Logger log = Logger.getLogger(FanOutTestCase.class);
+
+ /** The test clients in the receiving role. */
+ private List<TestClientDetails> receivers = new LinkedList<TestClientDetails>();
+
+ /**
+ * Creates a new coordinating test case with the specified name.
+ *
+ * @param name The test case name.
+ */
+ public FanOutTestCase(String name)
+ {
+ super(name);
+ }
+
+ /**
+ * Adds a receiver to this test.
+ *
+ * @param receiver The contact details of the sending client in the test.
+ */
+ public void setReceiver(TestClientDetails receiver)
+ {
+ receivers.add(receiver);
+ }
+
+ /**
+ * Holds a test coordinating conversation with the test clients. This is the basic implementation of the inner loop
+ * of Use Case 5. It consists of assigning the test roles, begining the test and gathering the test reports from the
+ * participants.
+ *
+ * @param testProperties The test case definition.
+ *
+ * @return The test results from the senders and receivers. The senders report will always be returned first,
+ * followed by the receivers reports.
+ *
+ * @throws JMSException All underlying JMSExceptions are allowed to fall through.
+ */
+ protected Message[] sequenceTest(Map<String, Object> testProperties) throws JMSException
+ {
+ log.debug("protected Message[] sequenceTest(Object... testProperties = " + testProperties + "): called");
+
+ // Create a conversation on the sender clients private control rouete.
+ Session session = conversationFactory.getSession();
+ Destination senderControlTopic = session.createTopic(sender.privateControlKey);
+ ConversationFactory.Conversation senderConversation = conversationFactory.startConversation();
+
+ // Assign the sender role to the sending test client.
+ Message assignSender = conversationFactory.getSession().createMessage();
+ setPropertiesOnMessage(assignSender, testProperties);
+ assignSender.setStringProperty("CONTROL_TYPE", "ASSIGN_ROLE");
+ assignSender.setStringProperty("ROLE", "SENDER");
+ assignSender.setStringProperty("CLIENT_NAME", "Sustained_SENDER");
+
+ senderConversation.send(senderControlTopic, assignSender);
+
+ // Wait for the sender to confirm its role.
+ senderConversation.receive();
+
+ // Assign the receivers roles.
+ for (TestClientDetails receiver : receivers)
+ {
+ assignReceiverRole(receiver, testProperties, true);
+ }
+
+ // Start the test on the sender.
+ Message start = session.createMessage();
+ start.setStringProperty("CONTROL_TYPE", "START");
+
+ senderConversation.send(senderControlTopic, start);
+
+ // Wait for the test sender to return its report.
+ Message senderReport = senderConversation.receive();
+ TestUtils.pause(500);
+
+ // Ask the receivers for their reports.
+ Message statusRequest = session.createMessage();
+ statusRequest.setStringProperty("CONTROL_TYPE", "STATUS_REQUEST");
+
+ // Gather the reports from all of the receiving clients.
+
+ // Return all of the test reports, the senders report first.
+ return new Message[] { senderReport };
+ }
+
+ /**
+ * Assigns the receiver role to the specified test client that is to act as a receiver during the test. This method
+ * does not always wait for the receiving clients to confirm their role assignments. This is because this method
+ * may be called from an 'onMessage' method, when a client is joining the test at a later point in time, and it
+ * is not possible to do a synchronous receive during an 'onMessage' method. There is a flag to indicate whether
+ * or not to wait for role confirmations.
+ *
+ * @param receiver The test client to assign the receiver role to.
+ * @param testProperties The test parameters.
+ * @param confirm Indicates whether role confirmation should be waited for.
+ *
+ * @throws JMSException Any JMSExceptions occurring during the conversation are allowed to fall through.
+ */
+ protected void assignReceiverRole(TestClientDetails receiver, Map<String, Object> testProperties, boolean confirm)
+ throws JMSException
+ {
+ log.info("assignReceiverRole(TestClientDetails receiver = " + receiver + ", Map<String, Object> testProperties = "
+ + testProperties + "): called");
+
+ // Create a conversation with the receiving test client.
+ Session session = conversationFactory.getSession();
+ Destination receiverControlTopic = session.createTopic(receiver.privateControlKey);
+ ConversationFactory.Conversation receiverConversation = conversationFactory.startConversation();
+
+ // Assign the receiver role to the receiving client.
+ Message assignReceiver = session.createMessage();
+ setPropertiesOnMessage(assignReceiver, testProperties);
+ assignReceiver.setStringProperty("CONTROL_TYPE", "ASSIGN_ROLE");
+ assignReceiver.setStringProperty("ROLE", "RECEIVER");
+ assignReceiver.setStringProperty("CLIENT_NAME", receiver.clientName);
+
+ receiverConversation.send(receiverControlTopic, assignReceiver);
+
+ // Wait for the role confirmation to come back.
+ if (confirm)
+ {
+ receiverConversation.receive();
+ }
+ }
+}
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/ListeningTestDecorator.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/FanOutTestDecorator.java index 4312dfbcc6..5e3fb51b97 100644 --- a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/ListeningTestDecorator.java +++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/FanOutTestDecorator.java @@ -1,200 +1,182 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.interop.coordinator; - -import junit.framework.Test; -import junit.framework.TestResult; -import org.apache.log4j.Logger; -import org.apache.qpid.util.ConversationFactory; -import uk.co.thebadgerset.junit.extensions.WrappedSuiteTestDecorator; - -import javax.jms.Connection; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageListener; -import java.util.Collection; -import java.util.Iterator; -import java.util.Set; - -/** - * <p><table id="crc"><caption>CRC Card</caption> <tr><th> Responsibilities <th> Collaborations <tr><td> Broadcast test - * invitations and collect enlists. <td> {@link ConversationFactory}. <tr><td> Output test failures for clients - * unwilling to run the test case. <td> {@link Coordinator} <tr><td> Execute coordinated test cases. <td> {@link - * CoordinatingTestCase} </table> - */ -public class ListeningTestDecorator extends WrappedSuiteTestDecorator implements MessageListener -{ - private static final Logger log = Logger.getLogger(ListeningTestDecorator.class); - - /** Holds the contact information for all test clients that are available and that may take part in the test. */ - Set<TestClientDetails> allClients; - - /** Holds the conversation helper for the control level conversation for coordinating the test through. */ - ConversationFactory conversationFactory; - - /** Holds the connection that the control conversation is held over. */ - Connection connection; - - /** Holds the underlying {@link CoordinatingTestCase}s that this decorator wraps. */ - WrappedSuiteTestDecorator testSuite; - - /** Hold the current running test case. */ - CoordinatingTestCase _currentTest = null; - - /** - * Creates a wrapped suite test decorator from another one. - * - * @param suite The test suite. - * @param availableClients The list of all clients that responded to the compulsory invite. - * @param controlConversation The conversation helper for the control level, test coordination conversation. - * @param controlConnection The connection that the coordination messages are sent over. - */ - public ListeningTestDecorator(WrappedSuiteTestDecorator suite, Set<TestClientDetails> availableClients, - ConversationFactory controlConversation, Connection controlConnection) - { - super(suite); - - log.debug("public InvitingTestDecorator(WrappedSuiteTestDecorator suite, Set<TestClientDetails> allClients = " - + availableClients + ", ConversationHelper controlConversation = " + controlConversation + "): called"); - - testSuite = suite; - allClients = availableClients; - conversationFactory = controlConversation; - connection = controlConnection; - } - - /** - * Broadcasts a test invitation and accetps enlisting from participating clients. The wrapped test case is then - * repeated for every combination of test clients (provided the wrapped test case extends {@link - * CoordinatingTestCase}. - * - * <p/>Any JMSExceptions during the invite/enlist conversation will be allowed to fall through as runtime - * exceptions, resulting in the non-completion of the test run. - * - * @param testResult The the results object to monitor the test results with. - * - * @todo Better error recovery for failure of the invite/enlist conversation could be added. - */ - public void run(TestResult testResult) - { - log.debug("public void run(TestResult testResult): called"); - - Collection<Test> tests = testSuite.getAllUnderlyingTests(); - - for (Test test : tests) - { - CoordinatingTestCase coordTest = (CoordinatingTestCase) test; - - Set<TestClientDetails> enlists = signupClients(coordTest); - - if (enlists.size() == 0) - { - throw new RuntimeException("No clients to test with"); - } - - Iterator<TestClientDetails> clients = enlists.iterator(); - coordTest.setSender(clients.next()); - - while (clients.hasNext()) - { - // Set the sending and receiving client details on the test case. - coordTest.setReceiver(clients.next()); - } - - // Pass down the connection to hold the coordination conversation over. - coordTest.setConversationFactory(conversationFactory); - - - if (coordTest instanceof ListeningCoordinatorTest) - { - _currentTest = coordTest; - } - // Execute the test case. - coordTest.run(testResult); - - _currentTest = null; - } - } - - private Set<TestClientDetails> signupClients(CoordinatingTestCase coordTest) - { - // Broadcast the invitation to find out what clients are available to test. - Set<TestClientDetails> enlists; - try - { - Message invite = conversationFactory.getSession().createMessage(); - Destination controlTopic = conversationFactory.getSession().createTopic("iop.control"); - ConversationFactory.Conversation conversation = conversationFactory.startConversation(); - - invite.setStringProperty("CONTROL_TYPE", "INVITE"); - invite.setStringProperty("TEST_NAME", coordTest.getTestCaseNameForTestMethod(coordTest.getName())); - - conversation.send(controlTopic, invite); - - // Wait for a short time, to give test clients an opportunity to reply to the invitation. - Collection<Message> replies = conversation.receiveAll(allClients.size(), 5000); - - log.debug("Received " + replies.size() + " enlist replies"); - - enlists = Coordinator.extractEnlists(replies); - - //Create topic to listen on for latejoiners - Destination listenTopic = conversationFactory.getSession().createTopic("iop.control.test." + coordTest.getTestCaseNameForTestMethod(coordTest.getName())); - - //Listen for joiners - conversationFactory.getSession().createConsumer(listenTopic).setMessageListener(this); - log.debug("Created consumer on :" + listenTopic); - } - catch (JMSException e) - { - throw new RuntimeException("There was a JMSException during the invite/enlist conversation.", e); - } - - return enlists; - } - - /** - * Prints a string summarizing this test decorator, mainly for debugging purposes. - * - * @return String representation for debugging purposes. - */ - public String toString() - { - return "ListeningTestDecorator: [ testSuite = " + testSuite + " ]"; - } - - - public void onMessage(Message message) - { - try - { - if (message.getStringProperty("CONTROL_TYPE").equals("LATEJOIN")) - { - ((ListeningCoordinatorTest) _currentTest).latejoin(message); - } - } - catch (JMSException e) - { - log.debug("Unable to process message:" + message); - } - } -} +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.interop.coordinator;
+
+import junit.framework.Test;
+import junit.framework.TestResult;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.util.ConversationFactory;
+
+import uk.co.thebadgerset.junit.extensions.WrappedSuiteTestDecorator;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Set;
+
+/**
+ * FanOutTestDecorator is an {@link InvitingTestDecorator} that runs one test client in the sender role, and the remainder
+ * in the receiver role. It also has the capability to listen for new test cases joining the test beyond the initial start
+ * point. This feature can be usefull when experimenting with adding more load, in the form of more test clients, to assess
+ * its impact on a running test.
+ *
+ * <p><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Execute coordinated test cases. <td> {@link InteropTestCase}
+ * <tr><td> Accept test clients joining a running test.
+ * </table>
+ */
+public class FanOutTestDecorator extends InvitingTestDecorator implements MessageListener
+{
+ /** Used for debugging. */
+ private static final Logger log = Logger.getLogger(FanOutTestDecorator.class);
+
+ /** Holds the currently running test case. */
+ InteropTestCase currentTest = null;
+
+ /**
+ * Creates a wrapped suite test decorator from another one.
+ *
+ * @param suite The test suite.
+ * @param availableClients The list of all clients that responded to the compulsory invite.
+ * @param controlConversation The conversation helper for the control level, test coordination conversation.
+ * @param controlConnection The connection that the coordination messages are sent over.
+ */
+ public FanOutTestDecorator(WrappedSuiteTestDecorator suite, Set<TestClientDetails> availableClients,
+ ConversationFactory controlConversation, Connection controlConnection)
+ {
+ super(suite, availableClients, controlConversation, controlConnection);
+
+ log.debug("public InvitingTestDecorator(WrappedSuiteTestDecorator suite, Set<TestClientDetails> allClients = "
+ + availableClients + ", ConversationHelper controlConversation = " + controlConversation + "): called");
+
+ testSuite = suite;
+ allClients = availableClients;
+ conversationFactory = controlConversation;
+ connection = controlConnection;
+ }
+
+ /**
+ * Broadcasts a test invitation and accepts enlists from participating clients. The wrapped test cases are run
+ * with one test client in the sender role, and the remaining test clients in the receiving role.
+ *
+ * <p/>Any JMSExceptions during the invite/enlist conversation will be allowed to fall through as runtime
+ * exceptions, resulting in the non-completion of the test run.
+ *
+ * @param testResult The the results object to monitor the test results with.
+ *
+ * @todo Better error recovery for failure of the invite/enlist conversation could be added.
+ */
+ public void run(TestResult testResult)
+ {
+ log.debug("public void run(TestResult testResult): called");
+
+ Collection<Test> tests = testSuite.getAllUnderlyingTests();
+
+ // Listen for late joiners on the control topic.
+ try
+ {
+ conversationFactory.getSession().createConsumer(controlTopic).setMessageListener(this);
+ }
+ catch (JMSException e)
+ {
+ throw new RuntimeException("Unable to set up the message listener on the control topic.", e);
+ }
+
+ // Run all of the test cases in the test suite.
+ for (Test test : tests)
+ {
+ InteropTestCase coordTest = (InteropTestCase) test;
+
+ // Get all of the clients able to participate in the test.
+ Set<TestClientDetails> enlists = signupClients(coordTest);
+
+ // Check that there were some clients available.
+ if (enlists.size() == 0)
+ {
+ throw new RuntimeException("No clients to test with");
+ }
+
+ // Set up the first client in the sender role, and the remainder in the receiver role.
+ Iterator<TestClientDetails> clients = enlists.iterator();
+ coordTest.setSender(clients.next());
+
+ while (clients.hasNext())
+ {
+ // Set the sending and receiving client details on the test case.
+ coordTest.setReceiver(clients.next());
+ }
+
+ // Pass down the connection to hold the coordinating conversation over.
+ coordTest.setConversationFactory(conversationFactory);
+
+ // If the current test case is a drop-in test, set it up as the currently running test for late joiners to
+ // add in to. Otherwise the current test field is set to null, to indicate that late joiners are not allowed.
+ currentTest = (coordTest instanceof DropInTest) ? coordTest : null;
+
+ // Execute the test case.
+ coordTest.run(testResult);
+
+ currentTest = null;
+ }
+ }
+
+ /**
+ * Listens to incoming messages on the control topic. If the messages are 'join' messages, signalling a new
+ * test client wishing to join the current test, then the new client will be added to the current test in the
+ * receiver role.
+ *
+ * @param message The incoming control message.
+ */
+ public void onMessage(Message message)
+ {
+ try
+ {
+ // Check if the message is from a test client attempting to join a running test, and join it to the current
+ // test case if so.
+ if (message.getStringProperty("CONTROL_TYPE").equals("JOIN") && (currentTest != null))
+ {
+ ((DropInTest) currentTest).lateJoin(message);
+ }
+ }
+ // There is not a lot can be done with this error, so it is deliberately ignored.
+ catch (JMSException e)
+ {
+ log.debug("Unable to process message:" + message);
+ }
+ }
+
+ /**
+ * Prints a string summarizing this test decorator, mainly for debugging purposes.
+ *
+ * @return String representation for debugging purposes.
+ */
+ public String toString()
+ {
+ return "FanOutTestDecorator: [ testSuite = " + testSuite + " ]";
+ }
+}
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/CoordinatingTestCase.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/InteropTestCase.java index d2042be741..f895b781f0 100644 --- a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/CoordinatingTestCase.java +++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/InteropTestCase.java @@ -24,6 +24,7 @@ import junit.framework.TestCase; import org.apache.log4j.Logger;
+import org.apache.qpid.test.framework.TestUtils;
import org.apache.qpid.util.ConversationFactory;
import javax.jms.*;
@@ -31,7 +32,7 @@ import javax.jms.*; import java.util.Map;
/**
- * A CoordinatingTestCase is a JUnit test case extension that knows how to coordinate test clients that take part in a
+ * A InteropTestCase is a JUnit test case extension that knows how to coordinate test clients that take part in a
* test case as defined in the interop testing specification
* (http://cwiki.apache.org/confluence/display/qpid/Interop+Testing+Specification).
*
@@ -64,10 +65,10 @@ import java.util.Map; * <tr><td> Supply test properties
* </table>
*/
-public abstract class CoordinatingTestCase extends TestCase
+public abstract class InteropTestCase extends TestCase
{
/** Used for debugging. */
- private static final Logger log = Logger.getLogger(CoordinatingTestCase.class);
+ private static final Logger log = Logger.getLogger(InteropTestCase.class);
/** Holds the contact details for the sending test client. */
protected TestClientDetails sender;
@@ -83,7 +84,7 @@ public abstract class CoordinatingTestCase extends TestCase *
* @param name The test case name.
*/
- public CoordinatingTestCase(String name)
+ public InteropTestCase(String name)
{
super(name);
}
@@ -152,9 +153,10 @@ public abstract class CoordinatingTestCase extends TestCase }
/**
- * Should provide a translation from the junit method name of a test to its test case name as defined in the
- * interop testing specification. For example the method "testP2P" might map onto the interop test case name
- * "TC2_BasicP2P".
+ * Should provide a translation from the junit method name of a test to its test case name as known to the test
+ * clients that will run the test. The purpose of this is to convert the JUnit method name into the correct test
+ * case name to place into the test invite. For example the method "testP2P" might map onto the interop test case
+ * name "TC2_BasicP2P".
*
* @param methodName The name of the JUnit test method.
*
@@ -222,13 +224,7 @@ public abstract class CoordinatingTestCase extends TestCase // Wait for the test sender to return its report.
Message senderReport = senderConversation.receive();
-
- try
- {
- Thread.sleep(500);
- }
- catch (InterruptedException e)
- { }
+ TestUtils.pause(500);
// Ask the receiver for its report.
Message statusRequest = session.createMessage();
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/InteropTestDecorator.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/InteropTestDecorator.java new file mode 100644 index 0000000000..85d127110d --- /dev/null +++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/InteropTestDecorator.java @@ -0,0 +1,184 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.interop.coordinator;
+
+import junit.framework.Test;
+import junit.framework.TestResult;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.util.ConversationFactory;
+
+import uk.co.thebadgerset.junit.extensions.WrappedSuiteTestDecorator;
+
+import javax.jms.Connection;
+
+import java.util.*;
+
+/**
+ * InvitingTestDecorator is a test decorator, written to implement the interop test specification. Given a list
+ * of enlisted test clients, that are available to run interop tests, this decorator invites them to participate
+ * in each test in the wrapped test suite. Amongst all the clients that respond to the invite, all pairs are formed,
+ * and each pairing (in both directions, but excluding the reflexive pairings) is split into a sender and receiver
+ * role and a test case run between them. Any enlisted combinations that do not accept a test invite are automatically
+ * failed.
+ *
+ * <p><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Broadcast test invitations and collect enlists. <td> {@link org.apache.qpid.util.ConversationFactory}.
+ * <tr><td> Output test failures for clients unwilling to run the test case. <td> {@link Coordinator}
+ * <tr><td> Execute coordinated test cases. <td> {@link InteropTestCase}
+ * <tr><td> Fail non participating pairings. <td> {@link OptOutTestCase}
+ * </table>
+ */
+public class InteropTestDecorator extends InvitingTestDecorator
+{
+ /** Used for debugging. */
+ private static final Logger log = Logger.getLogger(InteropTestDecorator.class);
+
+ /**
+ * Creates a wrapped suite test decorator from another one.
+ *
+ * @param suite The test suite.
+ * @param availableClients The list of all clients that responded to the compulsory invite.
+ * @param controlConversation The conversation helper for the control level, test coordination conversation.
+ * @param controlConnection The connection that the coordination messages are sent over.
+ */
+ public InteropTestDecorator(WrappedSuiteTestDecorator suite, Set<TestClientDetails> availableClients,
+ ConversationFactory controlConversation, Connection controlConnection)
+ {
+ super(suite, availableClients, controlConversation, controlConnection);
+ }
+
+ /**
+ * Broadcasts a test invitation and accetps enlisting from participating clients. The wrapped test case is
+ * then repeated for every combination of test clients (provided the wrapped test case extends
+ * {@link InteropTestCase}.
+ *
+ * <p/>Any JMSExceptions during the invite/enlist conversation will be allowed to fall through as runtime exceptions,
+ * resulting in the non-completion of the test run.
+ *
+ * @todo Better error recovery for failure of the invite/enlist conversation could be added.
+ *
+ * @param testResult The the results object to monitor the test results with.
+ */
+ public void run(TestResult testResult)
+ {
+ log.debug("public void run(TestResult testResult): called");
+
+ Collection<Test> tests = testSuite.getAllUnderlyingTests();
+
+ for (Test test : tests)
+ {
+ InteropTestCase coordTest = (InteropTestCase) test;
+
+ // Broadcast the invitation to find out what clients are available to test.
+ Set<TestClientDetails> enlists = signupClients(coordTest);
+
+ // Compare the list of willing clients to the list of all available.
+ Set<TestClientDetails> optOuts = new HashSet<TestClientDetails>(allClients);
+ optOuts.removeAll(enlists);
+
+ // Output test failures for clients that will not particpate in the test.
+ Set<List<TestClientDetails>> failPairs = allPairs(optOuts, allClients);
+
+ for (List<TestClientDetails> failPair : failPairs)
+ {
+ InteropTestCase failTest = new OptOutTestCase("testOptOut");
+ failTest.setSender(failPair.get(0));
+ failTest.setReceiver(failPair.get(1));
+
+ failTest.run(testResult);
+ }
+
+ // Loop over all combinations of clients, willing to run the test.
+ Set<List<TestClientDetails>> enlistedPairs = allPairs(enlists, enlists);
+
+ for (List<TestClientDetails> enlistedPair : enlistedPairs)
+ {
+ // Set the sending and receiving client details on the test case.
+ coordTest.setSender(enlistedPair.get(0));
+ coordTest.setReceiver(enlistedPair.get(1));
+
+ // Pass down the connection to hold the coordination conversation over.
+ coordTest.setConversationFactory(conversationFactory);
+
+ // Execute the test case.
+ coordTest.run(testResult);
+ }
+ }
+ }
+
+ /**
+ * Produces all pairs of combinations of elements from two sets. The ordering of the elements in the pair is
+ * important, that is the pair <l, r> is distinct from <r, l>; both pairs are generated. For any element, i, in
+ * both the left and right sets, the reflexive pair <i, i> is not generated.
+ *
+ * @param left The left set.
+ * @param right The right set.
+ * @param <E> The type of the content of the pairs.
+ *
+ * @return All pairs formed from the permutations of all elements of the left and right sets.
+ */
+ private <E> Set<List<E>> allPairs(Set<E> left, Set<E> right)
+ {
+ log.debug("private <E> Set<List<E>> allPairs(Set<E> left = " + left + ", Set<E> right = " + right + "): called");
+
+ Set<List<E>> results = new HashSet<List<E>>();
+
+ // Form all pairs from left to right.
+ // Form all pairs from right to left.
+ for (E le : left)
+ {
+ for (E re : right)
+ {
+ if (!le.equals(re))
+ {
+ results.add(new Pair<E>(le, re));
+ results.add(new Pair<E>(re, le));
+ }
+ }
+ }
+
+ log.debug("results = " + results);
+
+ return results;
+ }
+
+ /**
+ * A simple implementation of a pair, using a list.
+ */
+ private class Pair<T> extends ArrayList<T>
+ {
+ /**
+ * Creates a new pair of elements.
+ *
+ * @param first The first element.
+ * @param second The second element.
+ */
+ public Pair(T first, T second)
+ {
+ super();
+ super.add(first);
+ super.add(second);
+ }
+ }
+}
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/InvitingTestDecorator.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/InvitingTestDecorator.java index 8695f7f66f..1225d74fbf 100644 --- a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/InvitingTestDecorator.java +++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/InvitingTestDecorator.java @@ -20,14 +20,6 @@ */
package org.apache.qpid.interop.coordinator;
-import java.util.*;
-
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-
-import junit.framework.Test;
import junit.framework.TestResult;
import org.apache.log4j.Logger;
@@ -36,16 +28,26 @@ import org.apache.qpid.util.ConversationFactory; import uk.co.thebadgerset.junit.extensions.WrappedSuiteTestDecorator;
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+
+import java.util.*;
+
/**
+ * InvitingTestDecorator is a base class for writing test decorators that invite test clients to participate in
+ * distributed test cases. It provides a helper method, {@link #signupClients(InteropTestCase)}, that broadcasts
+ * an invitation and return the set of test clients that are available to particiapte in the test.
+ *
* <p><table id="crc"><caption>CRC Card</caption>
* <tr><th> Responsibilities <th> Collaborations
* <tr><td> Broadcast test invitations and collect enlists. <td> {@link ConversationFactory}.
- * <tr><td> Output test failures for clients unwilling to run the test case. <td> {@link Coordinator}
- * <tr><td> Execute coordinated test cases. <td> {@link CoordinatingTestCase}
* </table>
*/
-public class InvitingTestDecorator extends WrappedSuiteTestDecorator
+public abstract class InvitingTestDecorator extends WrappedSuiteTestDecorator
{
+ /** Used for debugging. */
private static final Logger log = Logger.getLogger(InvitingTestDecorator.class);
/** Holds the contact information for all test clients that are available and that may take part in the test. */
@@ -57,9 +59,12 @@ public class InvitingTestDecorator extends WrappedSuiteTestDecorator /** Holds the connection that the control conversation is held over. */
Connection connection;
- /** Holds the underlying {@link CoordinatingTestCase}s that this decorator wraps. */
+ /** Holds the underlying {@link InteropTestCase}s that this decorator wraps. */
WrappedSuiteTestDecorator testSuite;
+ /** Holds the control topic, on which test invitations are broadcast. */
+ protected Destination controlTopic;
+
/**
* Creates a wrapped suite test decorator from another one.
*
@@ -80,141 +85,67 @@ public class InvitingTestDecorator extends WrappedSuiteTestDecorator allClients = availableClients;
conversationFactory = controlConversation;
connection = controlConnection;
- }
- /**
- * Broadcasts a test invitation and accetps enlisting from participating clients. The wrapped test case is
- * then repeated for every combination of test clients (provided the wrapped test case extends
- * {@link CoordinatingTestCase}.
- *
- * <p/>Any JMSExceptions during the invite/enlist conversation will be allowed to fall through as runtime exceptions,
- * resulting in the non-completion of the test run.
- *
- * @todo Better error recovery for failure of the invite/enlist conversation could be added.
- *
- * @param testResult The the results object to monitor the test results with.
- */
- public void run(TestResult testResult)
- {
- log.debug("public void run(TestResult testResult): called");
-
- Collection<Test> tests = testSuite.getAllUnderlyingTests();
-
- for (Test test : tests)
+ // Set up the test control topic.
+ try
+ {
+ controlTopic = conversationFactory.getSession().createTopic("iop.control");
+ }
+ catch (JMSException e)
{
- CoordinatingTestCase coordTest = (CoordinatingTestCase) test;
-
- // Broadcast the invitation to find out what clients are available to test.
- Set<TestClientDetails> enlists;
- try
- {
- Message invite = conversationFactory.getSession().createMessage();
- Destination controlTopic = conversationFactory.getSession().createTopic("iop.control");
- ConversationFactory.Conversation conversation = conversationFactory.startConversation();
-
- invite.setStringProperty("CONTROL_TYPE", "INVITE");
- invite.setStringProperty("TEST_NAME", coordTest.getTestCaseNameForTestMethod(coordTest.getName()));
-
- conversation.send(controlTopic, invite);
-
- // Wait for a short time, to give test clients an opportunity to reply to the invitation.
- Collection<Message> replies = conversation.receiveAll(allClients.size(), 3000);
- enlists = Coordinator.extractEnlists(replies);
- }
- catch (JMSException e)
- {
- throw new RuntimeException("There was a JMSException during the invite/enlist conversation.", e);
- }
-
- // Compare the list of willing clients to the list of all available.
- Set<TestClientDetails> optOuts = new HashSet<TestClientDetails>(allClients);
- optOuts.removeAll(enlists);
-
- // Output test failures for clients that will not particpate in the test.
- Set<List<TestClientDetails>> failPairs = allPairs(optOuts, allClients);
-
- for (List<TestClientDetails> failPair : failPairs)
- {
- CoordinatingTestCase failTest = new OptOutTestCase("testOptOut");
- failTest.setSender(failPair.get(0));
- failTest.setReceiver(failPair.get(1));
-
- failTest.run(testResult);
- }
-
- // Loop over all combinations of clients, willing to run the test.
- Set<List<TestClientDetails>> enlistedPairs = allPairs(enlists, enlists);
-
- for (List<TestClientDetails> enlistedPair : enlistedPairs)
- {
- // Set the sending and receiving client details on the test case.
- coordTest.setSender(enlistedPair.get(0));
- coordTest.setReceiver(enlistedPair.get(1));
-
- // Pass down the connection to hold the coordination conversation over.
- coordTest.setConversationFactory(conversationFactory);
-
- // Execute the test case.
- coordTest.run(testResult);
- }
+ throw new RuntimeException("Unable to create the coordinating control topic to broadcast test invites on.", e);
}
}
/**
- * Prints a string summarizing this test decorator, mainly for debugging purposes.
+ * Should run all of the tests in the wrapped test suite.
*
- * @return String representation for debugging purposes.
+ * @param testResult The the results object to monitor the test results with.
*/
- public String toString()
- {
- return "InvitingTestDecorator: [ testSuite = " + testSuite + " ]";
- }
+ public abstract void run(TestResult testResult);
/**
- * Produces all pairs of combinations of elements from two sets. The ordering of the elements in the pair is
- * important, that is the pair <l, r> is distinct from <r, l>; both pairs are generated. For any element, i, in
- * both the left and right sets, the reflexive pair <i, i> is not generated.
+ * Broadcasts an invitation to participate in a coordinating test case to find out what clients are available to
+ * run the test case.
*
- * @param left The left set.
- * @param right The right set.
+ * @param coordTest The coordinating test case to broadcast an inviate for.
*
- * @return All pairs formed from the permutations of all elements of the left and right sets.
+ * @return A set of test clients that accepted the invitation.
*/
- private <E> Set<List<E>> allPairs(Set<E> left, Set<E> right)
+ protected Set<TestClientDetails> signupClients(InteropTestCase coordTest)
{
- log.debug("private <E> Set<List<E>> allPairs(Set<E> left = " + left + ", Set<E> right = " + right + "): called");
+ // Broadcast the invitation to find out what clients are available to test.
+ Set<TestClientDetails> enlists;
+ try
+ {
+ Message invite = conversationFactory.getSession().createMessage();
- Set<List<E>> results = new HashSet<List<E>>();
+ ConversationFactory.Conversation conversation = conversationFactory.startConversation();
- // Form all pairs from left to right.
- // Form all pairs from right to left.
- for (E le : left)
+ invite.setStringProperty("CONTROL_TYPE", "INVITE");
+ invite.setStringProperty("TEST_NAME", coordTest.getTestCaseNameForTestMethod(coordTest.getName()));
+
+ conversation.send(controlTopic, invite);
+
+ // Wait for a short time, to give test clients an opportunity to reply to the invitation.
+ Collection<Message> replies = conversation.receiveAll(allClients.size(), 3000);
+ enlists = Coordinator.extractEnlists(replies);
+ }
+ catch (JMSException e)
{
- for (E re : right)
- {
- if (!le.equals(re))
- {
- results.add(new Pair<E>(le, re));
- results.add(new Pair<E>(re, le));
- }
- }
+ throw new RuntimeException("There was a JMSException during the invite/enlist conversation.", e);
}
- log.debug("results = " + results);
-
- return results;
+ return enlists;
}
/**
- * A simple implementation of a pair, using a list.
+ * Prints a string summarizing this test decorator, mainly for debugging purposes.
+ *
+ * @return String representation for debugging purposes.
*/
- private class Pair<T> extends ArrayList<T>
+ public String toString()
{
- public Pair(T first, T second)
- {
- super();
- super.add(first);
- super.add(second);
- }
+ return "InvitingTestDecorator: [ testSuite = " + testSuite + " ]";
}
}
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/ListeningCoordinatorTest.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/ListeningCoordinatorTest.java deleted file mode 100644 index 1b4461f8c2..0000000000 --- a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/ListeningCoordinatorTest.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - * - */ -package org.apache.qpid.interop.coordinator; - -import javax.jms.Message; - -public interface ListeningCoordinatorTest -{ - public void latejoin(Message message); -} diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/OptOutTestCase.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/OptOutTestCase.java index 42a382a898..4332aaf55c 100644 --- a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/OptOutTestCase.java +++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/OptOutTestCase.java @@ -32,7 +32,7 @@ import junit.framework.Assert; * <tr><td> Fail the test with a suitable reason.
* </table>
*/
-public class OptOutTestCase extends CoordinatingTestCase
+public class OptOutTestCase extends InteropTestCase
{
/**
* Creates a new coordinating test case with the specified name.
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/TestClientDetails.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/TestClientDetails.java index c4a9d39cd8..742375b7bd 100644 --- a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/TestClientDetails.java +++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/TestClientDetails.java @@ -21,8 +21,12 @@ package org.apache.qpid.interop.coordinator;
/**
+ * TestClientDetails is used to encapsulate information about an interop test client. It pairs together the unique
+ * name of the client, and the route on which it listens to its control messages.
+ *
* <p><table id="crc"><caption>CRC Card</caption>
* <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Record test clients control addresses together with their names.
* </table>
*/
public class TestClientDetails
@@ -56,13 +60,8 @@ public class TestClientDetails final TestClientDetails testClientDetails = (TestClientDetails) o;
- if ((clientName != null) ? (!clientName.equals(testClientDetails.clientName))
- : (testClientDetails.clientName != null))
- {
- return false;
- }
-
- return true;
+ return !((clientName != null) ? (!clientName.equals(testClientDetails.clientName))
+ : (testClientDetails.clientName != null));
}
/**
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/XMLTestListener.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/XMLTestListener.java index 747ba0dd0b..74c86b1d83 100644 --- a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/XMLTestListener.java +++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/XMLTestListener.java @@ -18,14 +18,8 @@ * under the License.
*
*/
-
package org.apache.qpid.interop.coordinator;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.io.Writer;
-import java.util.*;
-
import junit.framework.AssertionFailedError;
import junit.framework.Test;
import junit.framework.TestCase;
@@ -34,6 +28,11 @@ import org.apache.log4j.Logger; import uk.co.thebadgerset.junit.extensions.listeners.TKTestListener;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.Writer;
+import java.util.*;
+
/**
* Listens for test results for a named test and outputs these in the standard JUnit XML format to the specified
* writer.
@@ -50,6 +49,12 @@ import uk.co.thebadgerset.junit.extensions.listeners.TKTestListener; *
* <p><table id="crc"><caption>CRC Card</caption>
* <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Listen to test lifecycle notifications.
+ * <tr><td> Listen to test errors and failures.
+ * <tr><td> Listen to test timings.
+ * <tr><td> Listen to test memory usages.
+ * <tr><td> Listen to parameterized test parameters.
+ * <tr><th> Responsibilities
* </table>
*
* @todo Merge this class with CSV test listener, making the collection of results common to both, and only factoring
@@ -97,7 +102,8 @@ public class XMLTestListener implements TKTestListener /**
* Creates a new XML results output listener that writes to the specified location.
*
- * @param writer The location to write results to.
+ * @param writer The location to write results to.
+ * @param testClassName The name of the test class to include in the test results.
*/
public XMLTestListener(Writer writer, String testClassName)
{
@@ -126,7 +132,9 @@ public class XMLTestListener implements TKTestListener }
/**
- * A test started.
+ * Notification that a test started.
+ *
+ * @param test The test that started.
*/
public void startTest(Test test)
{
@@ -189,7 +197,9 @@ public class XMLTestListener implements TKTestListener { }
/**
- * A test ended.
+ * Notification that a test ended.
+ *
+ * @param test The test that ended.
*/
public void endTest(Test test)
{
@@ -225,6 +235,9 @@ public class XMLTestListener implements TKTestListener /**
* An error occurred.
+ *
+ * @param test The test in which the error occurred.
+ * @param t The throwable that resulted from the error.
*/
public void addError(Test test, Throwable t)
{
@@ -237,6 +250,9 @@ public class XMLTestListener implements TKTestListener /**
* A failure occurred.
+ *
+ * @param test The test in which the failure occurred.
+ * @param t The JUnit assertions that led to the failure.
*/
public void addFailure(Test test, AssertionFailedError t)
{
@@ -339,13 +355,10 @@ public class XMLTestListener implements TKTestListener */
protected static class Result
{
- public Result(String testClass, String testName)
- {
- this.testClass = testClass;
- this.testName = testName;
- }
-
+ /** Holds the name of the test class. */
public String testClass;
+
+ /** Holds the name of the test method. */
public String testName;
/** Holds the exception that caused error in this test. */
@@ -354,49 +367,16 @@ public class XMLTestListener implements TKTestListener /** Holds the assertion exception that caused failure in this test. */
public AssertionFailedError failure;
- /** Holds the error count for this test. */
- // public int errors = 0;
-
- /** Holds the failure count for this tests. */
- // public int failures = 0;
-
- /** Holds the overall tests run count for this test. */
- // public int runs = 0;
-
- /*public boolean equals(Object o)
+ /**
+ * Creates a placeholder for the results of a test.
+ *
+ * @param testClass The test class.
+ * @param testName The name of the test that was run.
+ */
+ public Result(String testClass, String testName)
{
- if (this == o)
- {
- return true;
- }
-
- if (!(o instanceof Result))
- {
- return false;
- }
-
- final Result result = (Result) o;
-
- if ((testClass != null) ? (!testClass.equals(result.testClass)) : (result.testClass != null))
- {
- return false;
- }
-
- if ((testName != null) ? (!testName.equals(result.testName)) : (result.testName != null))
- {
- return false;
- }
-
- return true;
+ this.testClass = testClass;
+ this.testName = testName;
}
-
- public int hashCode()
- {
- int result;
- result = ((testClass != null) ? testClass.hashCode() : 0);
- result = (29 * result) + ((testName != null) ? testName.hashCode() : 0);
-
- return result;
- }*/
}
}
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/testcases/CoordinatingTestCase1DummyRun.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/testcases/InteropTestCase1DummyRun.java index e642ef792b..b74a55d964 100644 --- a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/testcases/CoordinatingTestCase1DummyRun.java +++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/testcases/InteropTestCase1DummyRun.java @@ -18,44 +18,48 @@ * under the License.
*
*/
-
package org.apache.qpid.interop.coordinator.testcases;
-import java.util.HashMap;
-import java.util.Map;
-
-import javax.jms.Message;
-
import junit.framework.Assert;
import org.apache.log4j.Logger;
-import org.apache.qpid.interop.coordinator.CoordinatingTestCase;
+import org.apache.qpid.interop.coordinator.InteropTestCase;
+
+import javax.jms.Message;
+
+import java.util.HashMap;
+import java.util.Map;
/**
+ * Coordinates test case 1, from the interop test specification. This test connects up the sender and receiver roles,
+ * and gets some dummy test reports from them, in order to check that the test framework itself is operational.
+ *
* <p><table id="crc"><caption>CRC Card</caption>
* <tr><th> Responsibilities <th> Collaborations
* <tr><td> Exercises the interop testing framework without actually sending any test messages.
- * <td> {@link org.apache.qpid.interop.coordinator.CoordinatingTestCase}
+ * <td> {@link org.apache.qpid.interop.coordinator.InteropTestCase}
* </table>
*/
-public class CoordinatingTestCase1DummyRun extends CoordinatingTestCase
+public class InteropTestCase1DummyRun extends InteropTestCase
{
/** Used for debugging. */
- private static final Logger log = Logger.getLogger(CoordinatingTestCase1DummyRun.class);
+ private static final Logger log = Logger.getLogger(InteropTestCase1DummyRun.class);
/**
* Creates a new coordinating test case with the specified name.
*
* @param name The test case name.
*/
- public CoordinatingTestCase1DummyRun(String name)
+ public InteropTestCase1DummyRun(String name)
{
super(name);
}
/**
* Performs the basic P2P test case, "Test Case 2" in the specification.
+ *
+ * @throws Exception Any exceptions are allowed to fall through and fail the test.
*/
public void testDummyRun() throws Exception
{
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/testcases/CoordinatingTestCase2BasicP2P.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/testcases/InteropTestCase2BasicP2P.java index b1b2d9f847..406b8b42a6 100644 --- a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/testcases/CoordinatingTestCase2BasicP2P.java +++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/testcases/InteropTestCase2BasicP2P.java @@ -18,43 +18,48 @@ * under the License.
*
*/
-
package org.apache.qpid.interop.coordinator.testcases;
-import java.util.HashMap;
-import java.util.Map;
-
-import javax.jms.Message;
-
import junit.framework.Assert;
import org.apache.log4j.Logger;
-import org.apache.qpid.interop.coordinator.CoordinatingTestCase;
+import org.apache.qpid.interop.coordinator.InteropTestCase;
+
+import javax.jms.Message;
+
+import java.util.HashMap;
+import java.util.Map;
/**
+ * Implements test case 2, from the interop test specification. This test sets up the TC2_BasicP2P test for 50
+ * messages. It checks that the sender and receiver reports both indicate that all the test messages were transmitted
+ * successfully.
+ *
* <p><table id="crc"><caption>CRC Card</caption>
* <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Setup p2p test parameters and compare with test output. <td> {@link CoordinatingTestCase}
+ * <tr><td> Setup p2p test parameters and compare with test output. <td> {@link InteropTestCase}
* </table>
*/
-public class CoordinatingTestCase2BasicP2P extends CoordinatingTestCase
+public class InteropTestCase2BasicP2P extends InteropTestCase
{
/** Used for debugging. */
- private static final Logger log = Logger.getLogger(CoordinatingTestCase2BasicP2P.class);
+ private static final Logger log = Logger.getLogger(InteropTestCase2BasicP2P.class);
/**
* Creates a new coordinating test case with the specified name.
*
* @param name The test case name.
*/
- public CoordinatingTestCase2BasicP2P(String name)
+ public InteropTestCase2BasicP2P(String name)
{
super(name);
}
/**
* Performs the basic P2P test case, "Test Case 2" in the specification.
+ *
+ * @throws Exception Any exceptions are allowed to fall through and fail the test.
*/
public void testBasicP2P() throws Exception
{
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/testcases/CoordinatingTestCase3BasicPubSub.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/testcases/InteropTestCase3BasicPubSub.java index 702c240e9a..ebb4cd764e 100644 --- a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/testcases/CoordinatingTestCase3BasicPubSub.java +++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/testcases/InteropTestCase3BasicPubSub.java @@ -18,43 +18,44 @@ * under the License.
*
*/
-
package org.apache.qpid.interop.coordinator.testcases;
-import java.util.HashMap;
-import java.util.Map;
-
-import javax.jms.Message;
-
import junit.framework.Assert;
import org.apache.log4j.Logger;
-import org.apache.qpid.interop.coordinator.CoordinatingTestCase;
+import org.apache.qpid.interop.coordinator.InteropTestCase;
+
+import javax.jms.Message;
+
+import java.util.HashMap;
+import java.util.Map;
/**
* <p><table id="crc"><caption>CRC Card</caption>
* <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Setup pub/sub test parameters and compare with test output. <td> {@link CoordinatingTestCase}
+ * <tr><td> Setup pub/sub test parameters and compare with test output. <td> {@link InteropTestCase}
* </table>
*/
-public class CoordinatingTestCase3BasicPubSub extends CoordinatingTestCase
+public class InteropTestCase3BasicPubSub extends InteropTestCase
{
/** Used for debugging. */
- private static final Logger log = Logger.getLogger(CoordinatingTestCase3BasicPubSub.class);
+ private static final Logger log = Logger.getLogger(InteropTestCase3BasicPubSub.class);
/**
* Creates a new coordinating test case with the specified name.
*
* @param name The test case name.
*/
- public CoordinatingTestCase3BasicPubSub(String name)
+ public InteropTestCase3BasicPubSub(String name)
{
super(name);
}
/**
* Performs the basic P2P test case, "Test Case 2" in the specification.
+ *
+ * @throws Exception Any exceptions are allowed to fall through and fail the test.
*/
public void testBasicPubSub() throws Exception
{
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/InteropClientTestCase.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/InteropClientTestCase.java index 37952d08c8..87f09faf1e 100644 --- a/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/InteropClientTestCase.java +++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/InteropClientTestCase.java @@ -44,7 +44,11 @@ public interface InteropClientTestCase extends MessageListener /** Defines the possible test case roles that an interop test case can take on. */
public enum Roles
{
- SENDER, RECEIVER;
+ /** Specifies the sender role. */
+ SENDER,
+
+ /** Specifies the receiver role. */
+ RECEIVER
}
/**
@@ -78,20 +82,13 @@ public interface InteropClientTestCase extends MessageListener public void assignRole(Roles role, Message assignRoleMessage) throws JMSException;
/**
- * Performs the test case actions.
- * return from here when you have finished the test.. this will signal the controller that the test has ended.
+ * Performs the test case actions. Returning from here, indicates that the sending role has completed its test.
+ *
* @throws JMSException Any JMSException resulting from reading the message are allowed to fall through.
*/
public void start() throws JMSException;
/**
- * Gives notice of termination of the test case actions.
- *
- * @throws JMSException Any JMSException resulting from allowed to fall through.
- */
- public void terminate() throws JMSException, InterruptedException;
-
- /**
* Gets a report on the actions performed by the test case in its assigned role.
*
* @param session The session to create the report message in.
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java index a904bfa419..baf8bc033d 100644 --- a/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java +++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java @@ -21,42 +21,37 @@ package org.apache.qpid.interop.testclient;
import org.apache.log4j.Logger;
+
import org.apache.qpid.interop.testclient.testcases.TestCase1DummyRun;
import org.apache.qpid.interop.testclient.testcases.TestCase2BasicP2P;
-import org.apache.qpid.util.CommandLineParser;
-import org.apache.qpid.util.PropertiesUtils;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.naming.Context;
-import javax.naming.InitialContext;
-import javax.naming.NamingException;
-import java.io.IOException;
+import org.apache.qpid.interop.testclient.testcases.TestCase3BasicPubSub;
+import org.apache.qpid.test.framework.MessagingTestConfigProperties;
+import org.apache.qpid.test.framework.TestUtils;
+
+import uk.co.thebadgerset.junit.extensions.util.ParsedProperties;
+import uk.co.thebadgerset.junit.extensions.util.TestContextProperties;
+
+import javax.jms.*;
+
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
-import java.util.Properties;
/**
* Implements a test client as described in the interop testing spec
* (http://cwiki.apache.org/confluence/display/qpid/Interop+Testing+Specification). A test client is an agent that
* reacts to control message sequences send by the test {@link org.apache.qpid.interop.coordinator.Coordinator}.
*
- * <p/><table><caption>Messages Handled by TestClient</caption>
+ * <p/><table><caption>Messages Handled by SustainedTestClient</caption>
* <tr><th> Message <th> Action
* <tr><td> Invite(compulsory) <td> Reply with Enlist.
* <tr><td> Invite(test case) <td> Reply with Enlist if test case available.
* <tr><td> AssignRole(test case) <td> Reply with Accept Role if matches an enlisted test. Keep test parameters.
* <tr><td> Start <td> Send test messages defined by test parameters. Send report on messages sent.
* <tr><td> Status Request <td> Send report on messages received.
+ * <tr><td> Terminate <td> Terminate the test client.
* </table>
*
* <p><table id="crc"><caption>CRC Card</caption>
@@ -67,12 +62,11 @@ import java.util.Properties; */
public class TestClient implements MessageListener
{
+ /** Used for debugging. */
private static Logger log = Logger.getLogger(TestClient.class);
- public static final String CONNECTION_PROPERTY = "connectionfactory.broker";
- public static final String CONNECTION_NAME = "broker";
+ /** Holds the default identifying name of the test client. */
public static final String CLIENT_NAME = "java";
- public static final String DEFAULT_CONNECTION_PROPS_RESOURCE = "org/apache/qpid/interop/connection.properties";
/** Holds the URL of the broker to run the tests on. */
public static String brokerUrl;
@@ -80,17 +74,34 @@ public class TestClient implements MessageListener /** Holds the virtual host to run the tests on. If <tt>null</tt>, then the default virtual host is used. */
public static String virtualHost;
+ /**
+ * Holds the test context properties that provides the default test parameters, plus command line overrides.
+ * This is initialized with the default test parameters, to which command line overrides may be applied.
+ */
+ public static ParsedProperties testContextProperties =
+ TestContextProperties.getInstance(MessagingTestConfigProperties.defaults);
+
/** Holds all the test cases loaded from the classpath. */
Map<String, InteropClientTestCase> testCases = new HashMap<String, InteropClientTestCase>();
+ /** Holds the test case currently being run by this client. */
protected InteropClientTestCase currentTestCase;
- protected Connection _connection;
+ /** Holds the connection to the broker that the test is being coordinated on. */
+ protected Connection connection;
+
+ /** Holds the message producer to hold the test coordination over. */
protected MessageProducer producer;
+
+ /** Holds the JMS session for the test coordination. */
protected Session session;
+ /** Holds the name of this client, with a default value. */
protected String clientName = CLIENT_NAME;
+ /** This flag indicates that the test client should attempt to join the currently running test case on start up. */
+ protected boolean join;
+
/**
* Creates a new interop test client, listenting to the specified broker and virtual host, with the specified client
* identifying name.
@@ -99,15 +110,16 @@ public class TestClient implements MessageListener * @param virtualHost The virtual host to conect to.
* @param clientName The client name to use.
*/
- public TestClient(String brokerUrl, String virtualHost, String clientName)
+ public TestClient(String brokerUrl, String virtualHost, String clientName, boolean join)
{
- log.debug("public TestClient(String brokerUrl = " + brokerUrl + ", String virtualHost = " + virtualHost
- + ", String clientName = " + clientName + "): called");
+ log.debug("public SustainedTestClient(String brokerUrl = " + brokerUrl + ", String virtualHost = " + virtualHost
+ + ", String clientName = " + clientName + "): called");
// Retain the connection parameters.
this.brokerUrl = brokerUrl;
this.virtualHost = virtualHost;
this.clientName = clientName;
+ this.join = join;
}
/**
@@ -124,49 +136,40 @@ public class TestClient implements MessageListener */
public static void main(String[] args)
{
- // Use the command line parser to evaluate the command line.
- CommandLineParser commandLine =
- new CommandLineParser(
+ // Override the default broker url to be localhost:5672.
+ testContextProperties.setProperty(MessagingTestConfigProperties.BROKER_PROPNAME, "tcp://localhost:5672");
+
+ // Use the command line parser to evaluate the command line with standard handling behaviour (print errors
+ // and usage then exist if there are errors).
+ // Any options and trailing name=value pairs are also injected into the test context properties object,
+ // to override any defaults that may have been set up.
+ ParsedProperties options =
+ new ParsedProperties(uk.co.thebadgerset.junit.extensions.util.CommandLineParser.processCommandLine(args,
+ new uk.co.thebadgerset.junit.extensions.util.CommandLineParser(
new String[][]
- {
- {"b", "The broker URL.", "broker", "false"},
- {"h", "The virtual host to use.", "virtual host", "false"},
- {"n", "The test client name.", "name", "false"}
- });
-
- // Capture the command line arguments or display errors and correct usage and then exit.
- Properties options = null;
-
- try
- {
- options = commandLine.parseCommandLine(args);
- }
- catch (IllegalArgumentException e)
- {
- System.out.println(commandLine.getErrors());
- System.out.println(commandLine.getUsage());
- System.exit(1);
- }
+ {
+ { "b", "The broker URL.", "broker", "false" },
+ { "h", "The virtual host to use.", "virtual host", "false" },
+ { "o", "The name of the directory to output test timings to.", "dir", "false" },
+ { "n", "The name of the test client.", "name", "false" },
+ { "j", "Join this test client to running test.", "false" }
+ }), testContextProperties));
// Extract the command line options.
String brokerUrl = options.getProperty("b");
String virtualHost = options.getProperty("h");
String clientName = options.getProperty("n");
-
- // Add all the trailing command line options (name=value pairs) to system properties. Tests may pick up
- // overridden values from there.
- commandLine.addCommandLineToSysProperties();
+ boolean join = options.getPropertyAsBoolean("j");
// Create a test client and start it running.
- TestClient client = new TestClient(brokerUrl, virtualHost, (clientName == null) ? CLIENT_NAME : clientName);
+ TestClient client = new TestClient(brokerUrl, virtualHost, (clientName == null) ? CLIENT_NAME : clientName, join);
// Use a class path scanner to find all the interop test case implementations.
+ // Hard code the test classes till the classpath scanner is fixed.
Collection<Class<? extends InteropClientTestCase>> testCaseClasses =
- new ArrayList<Class<? extends InteropClientTestCase>>();
+ new ArrayList<Class<? extends InteropClientTestCase>>();
// ClasspathScanner.getMatches(InteropClientTestCase.class, "^TestCase.*", true);
- // Hard code the test classes till the classpath scanner is fixed.
- Collections.addAll(testCaseClasses,
- new Class[]{TestCase1DummyRun.class, TestCase2BasicP2P.class, TestClient.class});
+ Collections.addAll(testCaseClasses, TestCase1DummyRun.class, TestCase2BasicP2P.class, TestCase3BasicPubSub.class);
try
{
@@ -182,7 +185,10 @@ public class TestClient implements MessageListener /**
* Starts the interop test client running. This causes it to start listening for incoming test invites.
*
- * @throws JMSException Any underlying JMSExceptions are allowed to fall through. @param testCaseClasses
+ * @param testCaseClasses The classes of the available test cases. The test case names from these are used to
+ * matchin incoming test invites against.
+ *
+ * @throws JMSException Any underlying JMSExceptions are allowed to fall through.
*/
protected void start(Collection<Class<? extends InteropClientTestCase>> testCaseClasses) throws JMSException
{
@@ -209,84 +215,36 @@ public class TestClient implements MessageListener }
// Open a connection to communicate with the coordinator on.
- _connection = createConnection(DEFAULT_CONNECTION_PROPS_RESOURCE, clientName, brokerUrl, virtualHost);
-
- session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ connection = TestUtils.createConnection(testContextProperties);
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Set this up to listen for control messages.
- MessageConsumer consumer = session.createConsumer(session.createTopic("iop.control." + clientName));
+ Topic privateControlTopic = session.createTopic("iop.control." + clientName);
+ MessageConsumer consumer = session.createConsumer(privateControlTopic);
consumer.setMessageListener(this);
- MessageConsumer consumer2 = session.createConsumer(session.createTopic("iop.control"));
+ Topic controlTopic = session.createTopic("iop.control");
+ MessageConsumer consumer2 = session.createConsumer(controlTopic);
consumer2.setMessageListener(this);
// Create a producer to send replies with.
producer = session.createProducer(null);
- // Start listening for incoming control messages.
- _connection.start();
- }
-
-
- public static Connection createConnection(String connectionPropsResource, String brokerUrl, String virtualHost)
- {
- return createConnection(connectionPropsResource, "clientID", brokerUrl, virtualHost);
- }
-
- /**
- * Establishes a JMS connection using a properties file and qpids built in JNDI implementation. This is a simple
- * convenience method for code that does anticipate handling connection failures. All exceptions that indicate that
- * the connection has failed, are wrapped as rutime exceptions, preumably handled by a top level failure handler.
- *
- * @param connectionPropsResource The name of the connection properties file.
- * @param clientID
- * @param brokerUrl The broker url to connect to, <tt>null</tt> to use the default from the
- * properties.
- * @param virtualHost The virtual host to connectio to, <tt>null</tt> to use the default.
- *
- * @return A JMS conneciton.
- *
- * @todo Make username/password configurable. Allow multiple urls for fail over. Once it feels right, move it to a
- * Utils library class.
- */
- public static Connection createConnection(String connectionPropsResource, String clientID, String brokerUrl, String virtualHost)
- {
- log.debug("public static Connection createConnection(String connectionPropsResource = " + connectionPropsResource
- + ", String brokerUrl = " + brokerUrl + ", String clientID = " + clientID
- + ", String virtualHost = " + virtualHost + " ): called");
-
- try
+ // If the join flag was set, then broadcast a join message to notify the coordinator that a new test client
+ // is available to join the current test case, if it supports it. This message may be ignored, or it may result
+ // in this test client receiving a test invite.
+ if (join)
{
- Properties connectionProps =
- PropertiesUtils.getProperties(TestClient.class.getClassLoader().getResourceAsStream(
- connectionPropsResource));
-
- if (brokerUrl != null)
- {
- String connectionString =
- "amqp://guest:guest@" + clientID + "/" + ((virtualHost != null) ? virtualHost : "") + "?brokerlist='" + brokerUrl + "'";
- connectionProps.setProperty(CONNECTION_PROPERTY, connectionString);
- }
-
- Context ctx = new InitialContext(connectionProps);
-
- ConnectionFactory cf = (ConnectionFactory) ctx.lookup(CONNECTION_NAME);
- Connection connection = cf.createConnection();
+ Message joinMessage = session.createMessage();
- return connection;
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- catch (NamingException e)
- {
- throw new RuntimeException(e);
- }
- catch (JMSException e)
- {
- throw new RuntimeException(e);
+ joinMessage.setStringProperty("CONTROL_TYPE", "JOIN");
+ joinMessage.setStringProperty("CLIENT_NAME", clientName);
+ joinMessage.setStringProperty("CLIENT_PRIVATE_CONTROL_KEY", "iop.control." + clientName);
+ producer.send(controlTopic, joinMessage);
}
+
+ // Start listening for incoming control messages.
+ connection.start();
}
/**
@@ -394,16 +352,8 @@ public class TestClient implements MessageListener {
log.info("Received termination instruction from coordinator.");
-// try
-// {
-// currentTestCase.terminate();
-// }
-// catch (InterruptedException e)
-// {
-// //
-// }
// Is a cleaner shutdown needed?
- _connection.close();
+ connection.close();
System.exit(0);
}
else
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase1DummyRun.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase1DummyRun.java index 5f257c0b36..9629e79b2c 100644 --- a/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase1DummyRun.java +++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase1DummyRun.java @@ -43,8 +43,15 @@ import javax.jms.Session; */
public class TestCase1DummyRun implements InteropClientTestCase
{
+ /** Used for debugging. */
private static final Logger log = Logger.getLogger(TestCase1DummyRun.class);
+ /**
+ * Should provide the name of the test case that this class implements. The exact names are defined in the
+ * interop testing spec.
+ *
+ * @return The name of the test case that this implements.
+ */
public String getName()
{
log.debug("public String getName(): called");
@@ -52,6 +59,15 @@ public class TestCase1DummyRun implements InteropClientTestCase return "TC1_DummyRun";
}
+ /**
+ * Determines whether the test invite that matched this test case is acceptable.
+ *
+ * @param inviteMessage The invitation to accept or reject.
+ *
+ * @return <tt>true</tt> to accept the invitation, <tt>false</tt> to reject it.
+ *
+ * @throws JMSException Any JMSException resulting from reading the message are allowed to fall through.
+ */
public boolean acceptInvite(Message inviteMessage) throws JMSException
{
log.debug("public boolean acceptInvite(Message inviteMessage): called");
@@ -60,6 +76,15 @@ public class TestCase1DummyRun implements InteropClientTestCase return true;
}
+ /**
+ * Assigns the role to be played by this test case. The test parameters are fully specified in the
+ * assignment message. When this method return the test case will be ready to execute.
+ *
+ * @param role The role to be played; sender or receiver.
+ * @param assignRoleMessage The role assingment message, contains the full test parameters.
+ *
+ * @throws JMSException Any JMSException resulting from reading the message are allowed to fall through.
+ */
public void assignRole(Roles role, Message assignRoleMessage) throws JMSException
{
log.debug("public void assignRole(Roles role, Message assignRoleMessage): called");
@@ -67,6 +92,9 @@ public class TestCase1DummyRun implements InteropClientTestCase // Do nothing, both roles are the same.
}
+ /**
+ * Performs the test case actions. Returning from here, indicates that the sending role has completed its test.
+ */
public void start()
{
log.debug("public void start(): called");
@@ -74,11 +102,15 @@ public class TestCase1DummyRun implements InteropClientTestCase // Do nothing.
}
- public void terminate() throws JMSException
- {
- //todo
- }
-
+ /**
+ * Gets a report on the actions performed by the test case in its assigned role.
+ *
+ * @param session The session to create the report message in.
+ *
+ * @return The report message.
+ *
+ * @throws JMSException Any JMSExceptions resulting from creating the report are allowed to fall through.
+ */
public Message getReport(Session session) throws JMSException
{
log.debug("public Message getReport(Session session): called");
@@ -87,6 +119,11 @@ public class TestCase1DummyRun implements InteropClientTestCase return session.createTextMessage("Dummy Run, Ok.");
}
+ /**
+ * Handles incoming test messages. Does nothing.
+ *
+ * @param message The incoming test message.
+ */
public void onMessage(Message message)
{
log.debug("public void onMessage(Message message = " + message + "): called");
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase2BasicP2P.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase2BasicP2P.java index ff56ee9b93..c93d1ab828 100644 --- a/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase2BasicP2P.java +++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase2BasicP2P.java @@ -20,12 +20,13 @@ */
package org.apache.qpid.interop.testclient.testcases;
-import javax.jms.*;
-
import org.apache.log4j.Logger;
import org.apache.qpid.interop.testclient.InteropClientTestCase;
import org.apache.qpid.interop.testclient.TestClient;
+import org.apache.qpid.test.framework.TestUtils;
+
+import javax.jms.*;
/**
* Implements test case 2, basic P2P. Sends/received a specified number of messages to a specified route on the
@@ -54,9 +55,6 @@ public class TestCase2BasicP2P implements InteropClientTestCase /** The number of test messages to send. */
private int numMessages;
- /** The routing key to send them to on the default direct exchange. */
- private Destination sendDestination;
-
/** The connection to send the test messages on. */
private Connection connection;
@@ -118,14 +116,12 @@ public class TestCase2BasicP2P implements InteropClientTestCase this.role = role;
// Create a new connection to pass the test messages on.
- connection =
- TestClient.createConnection(TestClient.DEFAULT_CONNECTION_PROPS_RESOURCE, TestClient.brokerUrl,
- TestClient.virtualHost);
+ connection = TestUtils.createConnection(TestClient.testContextProperties);
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Extract and retain the test parameters.
numMessages = assignRoleMessage.getIntProperty("P2P_NUM_MESSAGES");
- sendDestination = session.createQueue(assignRoleMessage.getStringProperty("P2P_QUEUE_AND_KEY_NAME"));
+ Destination sendDestination = session.createQueue(assignRoleMessage.getStringProperty("P2P_QUEUE_AND_KEY_NAME"));
log.debug("numMessages = " + numMessages);
log.debug("sendDestination = " + sendDestination);
@@ -149,7 +145,9 @@ public class TestCase2BasicP2P implements InteropClientTestCase }
/**
- * Performs the test case actions.
+ * Performs the test case actions. Returning from here, indicates that the sending role has completed its test.
+ *
+ * @throws JMSException Any JMSException resulting from reading the message are allowed to fall through.
*/
public void start() throws JMSException
{
@@ -170,11 +168,6 @@ public class TestCase2BasicP2P implements InteropClientTestCase }
}
- public void terminate() throws JMSException
- {
- //todo
- }
-
/**
* Gets a report on the actions performed by the test case in its assigned role.
*
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase3BasicPubSub.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase3BasicPubSub.java index 7b35142c82..57e8634006 100644 --- a/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase3BasicPubSub.java +++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase3BasicPubSub.java @@ -18,14 +18,15 @@ * under the License.
*
*/
-
package org.apache.qpid.interop.testclient.testcases;
-import javax.jms.*;
-
import org.apache.log4j.Logger;
import org.apache.qpid.interop.testclient.InteropClientTestCase;
+import org.apache.qpid.interop.testclient.TestClient;
+import org.apache.qpid.test.framework.TestUtils;
+
+import javax.jms.*;
/**
* Implements test case 3, basic pub/sub. Sends/received a specified number of messages to a specified route on the
@@ -55,12 +56,6 @@ public class TestCase3BasicPubSub implements InteropClientTestCase /** The number of test messages to send. */
private int numMessages;
- /** The number of receiver connection to use. */
- private int numReceivers;
-
- /** The routing key to send them to on the default direct exchange. */
- private Destination sendDestination;
-
/** The connections to send/receive the test messages on. */
private Connection[] connection;
@@ -123,7 +118,7 @@ public class TestCase3BasicPubSub implements InteropClientTestCase // Extract and retain the test parameters.
numMessages = assignRoleMessage.getIntProperty("PUBSUB_NUM_MESSAGES");
- numReceivers = assignRoleMessage.getIntProperty("PUBSUB_NUM_RECEIVERS");
+ int numReceivers = assignRoleMessage.getIntProperty("PUBSUB_NUM_RECEIVERS");
String sendKey = assignRoleMessage.getStringProperty("PUBSUB_KEY");
log.debug("numMessages = " + numMessages);
@@ -139,13 +134,11 @@ public class TestCase3BasicPubSub implements InteropClientTestCase connection = new Connection[1];
session = new Session[1];
- connection[0] =
- org.apache.qpid.interop.testclient.TestClient.createConnection(org.apache.qpid.interop.testclient.TestClient.DEFAULT_CONNECTION_PROPS_RESOURCE, org.apache.qpid.interop.testclient.TestClient.brokerUrl,
- org.apache.qpid.interop.testclient.TestClient.virtualHost);
+ connection[0] = TestUtils.createConnection(TestClient.testContextProperties);
session[0] = connection[0].createSession(false, Session.AUTO_ACKNOWLEDGE);
// Extract and retain the test parameters.
- sendDestination = session[0].createTopic(sendKey);
+ Destination sendDestination = session[0].createTopic(sendKey);
producer = session[0].createProducer(sendDestination);
break;
@@ -159,9 +152,7 @@ public class TestCase3BasicPubSub implements InteropClientTestCase for (int i = 0; i < numReceivers; i++)
{
- connection[i] =
- org.apache.qpid.interop.testclient.TestClient.createConnection(org.apache.qpid.interop.testclient.TestClient.DEFAULT_CONNECTION_PROPS_RESOURCE, org.apache.qpid.interop.testclient.TestClient.brokerUrl,
- org.apache.qpid.interop.testclient.TestClient.virtualHost);
+ connection[i] = TestUtils.createConnection(TestClient.testContextProperties);
session[i] = connection[i].createSession(false, Session.AUTO_ACKNOWLEDGE);
sendDestination = session[i].createTopic(sendKey);
@@ -174,14 +165,16 @@ public class TestCase3BasicPubSub implements InteropClientTestCase }
// Start all the connection dispatcher threads running.
- for (int i = 0; i < connection.length; i++)
+ for (Connection conn : connection)
{
- connection[i].start();
+ conn.start();
}
}
/**
- * Performs the test case actions.
+ * Performs the test case actions. Returning from here, indicates that the sending role has completed its test.
+ *
+ * @throws JMSException Any JMSException resulting from reading the message are allowed to fall through.
*/
public void start() throws JMSException
{
@@ -202,11 +195,6 @@ public class TestCase3BasicPubSub implements InteropClientTestCase }
}
- public void terminate() throws JMSException, InterruptedException
- {
- //todo
- }
-
/**
* Gets a report on the actions performed by the test case in its assigned role.
*
@@ -221,9 +209,9 @@ public class TestCase3BasicPubSub implements InteropClientTestCase log.debug("public Message getReport(Session session): called");
// Close the test connections.
- for (int i = 0; i < connection.length; i++)
+ for (Connection conn : connection)
{
- connection[i].close();
+ conn.close();
}
// Generate a report message containing the count of the number of messages passed.
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestClient.java b/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedClientTestCase.java index 79707bafa5..71ab38ec0a 100644 --- a/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestClient.java +++ b/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedClientTestCase.java @@ -1,931 +1,905 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.sustained; - -import org.apache.log4j.Logger; -import org.apache.qpid.client.AMQNoConsumersException; -import org.apache.qpid.client.AMQNoRouteException; -import org.apache.qpid.interop.testclient.testcases.TestCase3BasicPubSub; - -import javax.jms.Connection; -import javax.jms.Destination; -import javax.jms.ExceptionListener; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; -import java.util.concurrent.CountDownLatch; - -/** - * Implements test case 3, basic pub/sub. Sends/received a specified number of messages to a specified route on the - * default topic exchange, using the specified number of receiver connections. Produces reports on the actual number of - * messages sent/received. - * - * <p><table id="crc"><caption>CRC Card</caption> <tr><th> Responsibilities <th> Collaborations <tr><td> Supply the name - * of the test case that this implements. <tr><td> Accept/Reject invites based on test parameters. <tr><td> Adapt to - * assigned roles. <tr><td> Send required number of test messages using pub/sub. <tr><td> Generate test reports. - * </table> - */ -public class SustainedTestClient extends TestCase3BasicPubSub implements ExceptionListener -{ - /** Used for debugging. */ - private static final Logger debugLog = Logger.getLogger(SustainedTestClient.class); - - private static final Logger log = Logger.getLogger("SustainedTest"); - - - /** The role to be played by the test. */ - private Roles role; - - /** The number of test messages to send. */ -// private int numMessages; - - /** The number of receiver connection to use. */ - private int numReceivers; - - /** The routing key to send them to on the default direct exchange. */ - private Destination sendDestination; - - /** The routing key to send updates to on the default direct exchange. */ - private Destination sendUpdateDestination; - - - /** The connections to send/receive the test messages on. */ - private Connection[] connection; - - /** The sessions to send/receive the test messages on. */ - private Session[] session; - - /** The producer to send the test messages with. */ - MessageProducer producer; - - /** Adapter that adjusts the send rate based on the updates from clients. */ - SustainedRateAdapter _rateAdapter; - - /** */ - int _batchSize; - - - private static final long TEN_MILLI_SEC = 10000000; - private static final int DEBUG_LOG_UPATE_INTERVAL = 10; - private static final int LOG_UPATE_INTERVAL = 10; - private static final boolean SLEEP_PER_MESSAGE = Boolean.getBoolean("sleepPerMessage"); - - /** - * Should provide the name of the test case that this class implements. The exact names are defined in the interop - * testing spec. - * - * @return The name of the test case that this implements. - */ - public String getName() - { - debugLog.debug("public String getName(): called"); - - return "Perf_SustainedPubSub"; - } - - /** - * Assigns the role to be played by this test case. The test parameters are fully specified in the assignment - * message. When this method return the test case will be ready to execute. - * - * @param role The role to be played; sender or receiver. - * @param assignRoleMessage The role assingment message, contains the full test parameters. - * - * @throws JMSException Any JMSException resulting from reading the message are allowed to fall through. - */ - public void assignRole(Roles role, Message assignRoleMessage) throws JMSException - { - debugLog.debug("public void assignRole(Roles role = " + role + ", Message assignRoleMessage = " + assignRoleMessage - + "): called"); - - // Take note of the role to be played. - this.role = role; - - // Extract and retain the test parameters. - numReceivers = assignRoleMessage.getIntProperty("SUSTAINED_NUM_RECEIVERS"); - _batchSize = assignRoleMessage.getIntProperty("SUSTAINED_UPDATE_INTERVAL"); - String sendKey = assignRoleMessage.getStringProperty("SUSTAINED_KEY"); - String sendUpdateKey = assignRoleMessage.getStringProperty("SUSTAINED_UPDATE_KEY"); - int ackMode = assignRoleMessage.getIntProperty("ACKNOWLEDGE_MODE"); - String clientName = assignRoleMessage.getStringProperty("CLIENT_NAME"); - - if (debugLog.isDebugEnabled()) - { - debugLog.debug("numReceivers = " + numReceivers); - debugLog.debug("_batchSize = " + _batchSize); - debugLog.debug("ackMode = " + ackMode); - debugLog.debug("sendKey = " + sendKey); - debugLog.debug("sendUpdateKey = " + sendUpdateKey); - debugLog.debug("role = " + role); - } - - switch (role) - { - // Check if the sender role is being assigned, and set up a single message producer if so. - case SENDER: - log.info("Creating Sender"); - // Create a new connection to pass the test messages on. - connection = new Connection[1]; - session = new Session[1]; - - connection[0] = - org.apache.qpid.interop.testclient.TestClient.createConnection(org.apache.qpid.interop.testclient.TestClient.DEFAULT_CONNECTION_PROPS_RESOURCE, - clientName, - org.apache.qpid.interop.testclient.TestClient.brokerUrl, - org.apache.qpid.interop.testclient.TestClient.virtualHost); - session[0] = connection[0].createSession(false, ackMode); - - // Extract and retain the test parameters. - sendDestination = session[0].createTopic(sendKey); - - connection[0].setExceptionListener(this); - - producer = session[0].createProducer(sendDestination); - - sendUpdateDestination = session[0].createTopic(sendUpdateKey); - MessageConsumer updateConsumer = session[0].createConsumer(sendUpdateDestination); - - _rateAdapter = new SustainedRateAdapter(this); - updateConsumer.setMessageListener(_rateAdapter); - - - break; - - // Otherwise the receiver role is being assigned, so set this up to listen for messages on the required number - // of receiver connections. - case RECEIVER: - log.info("Creating Receiver"); - // Create the required number of receiver connections. - connection = new Connection[numReceivers]; - session = new Session[numReceivers]; - - for (int i = 0; i < numReceivers; i++) - { - connection[i] = - org.apache.qpid.interop.testclient.TestClient.createConnection(org.apache.qpid.interop.testclient.TestClient.DEFAULT_CONNECTION_PROPS_RESOURCE, - clientName, - org.apache.qpid.interop.testclient.TestClient.brokerUrl, - org.apache.qpid.interop.testclient.TestClient.virtualHost); - session[i] = connection[i].createSession(false, ackMode); - - sendDestination = session[i].createTopic(sendKey); - - sendUpdateDestination = session[i].createTopic(sendUpdateKey); - - MessageConsumer consumer = session[i].createConsumer(sendDestination); - - consumer.setMessageListener(new SustainedListener(clientName + "-" + i, _batchSize, session[i], sendUpdateDestination)); - } - - break; - } - - // Start all the connection dispatcher threads running. - for (int i = 0; i < connection.length; i++) - { - connection[i].start(); - } - } - - - /** Performs the test case actions. */ - public void start() throws JMSException - { - debugLog.debug("public void start(): called"); - - // Check that the sender role is being performed. - switch (role) - { - // Check if the sender role is being assigned, and set up a single message producer if so. - case SENDER: - _rateAdapter.run(); - break; - case RECEIVER: - - } - - //return from here when you have finished the test.. this will signal the controller and - } - - public void terminate() throws JMSException, InterruptedException - { - if (_rateAdapter != null) - { - _rateAdapter.stop(); - } - } - - /** - * Gets a report on the actions performed by the test case in its assigned role. - * - * @param session The session to create the report message in. - * - * @return The report message. - * - * @throws JMSException Any JMSExceptions resulting from creating the report are allowed to fall through. - */ - public Message getReport(Session session) throws JMSException - { - debugLog.debug("public Message getReport(Session session): called"); - - // Close the test connections. - for (int i = 0; i < connection.length; i++) - { - connection[i].close(); - } - - Message report = session.createMessage(); - report.setStringProperty("CONTROL_TYPE", "REPORT"); - - return report; - } - - public void onException(JMSException jmsException) - { - Exception linked = jmsException.getLinkedException(); - - if (linked != null) - { - if (debugLog.isDebugEnabled()) - { - debugLog.debug("Linked Exception:" + linked); - } - if ((linked instanceof AMQNoRouteException) - || (linked instanceof AMQNoConsumersException)) - { - if (debugLog.isDebugEnabled()) - { - if (linked instanceof AMQNoConsumersException) - { - debugLog.warn("No clients currently available for message:" + ((AMQNoConsumersException) linked).getUndeliveredMessage()); - } - else - { - debugLog.warn("No route for message"); - } - } - - // Tell the rate adapter that there are no clients ready yet - _rateAdapter.NO_CLIENTS = true; - } - } - else - { - debugLog.warn("Exception:" + linked); - } - } - - /** - * Inner class that listens for messages and sends a report for the time taken between receiving the 'start' and - * 'end' messages. - */ - class SustainedListener implements MessageListener - { - /** Number of messages received */ - private long _received = 0; - /** The number of messages in the batch */ - private int _batchSize = 0; - /** Record of the when the 'start' messagse was sen */ - private Long _startTime; - /** Message producer to use to send reports */ - MessageProducer _updater; - /** Session to create the report message on */ - Session _session; - /** Record of the client ID used for this SustainedListnener */ - String _client; - - - /** - * Main Constructor - * - * @param clientname The _client id used to identify this connection. - * @param batchSize The number of messages that are to be sent per batch. Note: This is not used to - * control the interval between sending reports. - * @param session The session used for communication. - * @param sendDestination The destination that update reports should be sent to. - * - * @throws JMSException My occur if creatingthe Producer fails - */ - public SustainedListener(String clientname, int batchSize, Session session, Destination sendDestination) throws JMSException - { - _batchSize = batchSize; - _client = clientname; - _session = session; - _updater = session.createProducer(sendDestination); - } - - public void onMessage(Message message) - { - if (debugLog.isTraceEnabled()) - { - debugLog.trace("Message " + _received + "received in listener"); - } - - - if (message instanceof TextMessage) - { - try - { - _received++; - if (((TextMessage) message).getText().equals("start")) - { - debugLog.debug("Starting Batch"); - _startTime = System.nanoTime(); - } - else if (((TextMessage) message).getText().equals("end")) - { - if (_startTime != null) - { - long currentTime = System.nanoTime(); - sendStatus(currentTime - _startTime, _received, message.getIntProperty("BATCH")); - debugLog.debug("End Batch"); - } - } - } - catch (JMSException e) - { - //ignore error - } - } - - } - - /** - * sendStatus creates and sends the report back to the publisher - * - * @param time taken for the the last batch - * @param received Total number of messages received. - * @param batchNumber the batch number - * @throws JMSException if an error occurs during the send - */ - private void sendStatus(long time, long received, int batchNumber) throws JMSException - { - Message updateMessage = _session.createTextMessage("update"); - updateMessage.setStringProperty("CLIENT_ID", ":" + _client); - updateMessage.setStringProperty("CONTROL_TYPE", "UPDATE"); - updateMessage.setLongProperty("RECEIVED", received); - updateMessage.setIntProperty("BATCH", batchNumber); - updateMessage.setLongProperty("DURATION", time); - - if (debugLog.isInfoEnabled()) - { - debugLog.info("**** SENDING [" + batchNumber + "]**** " - + "CLIENT_ID:" + _client + " RECEIVED:" + received - + " BATCH:" + batchNumber + " DURATION:" + time); - } - - // Output on the main log.info the details of this batch - if (batchNumber % 10 == 0) - { - log.info("Sending Report [" + batchNumber + "] " - + "CLIENT_ID:" + _client + " RECEIVED:" + received - + " BATCH:" + batchNumber + " DURATION:" + time); - } - - _updater.send(updateMessage); - } - } - - - /** - * This class is used here to adjust the _delay value which in turn is used to control the number of messages/second - * that are sent through the test system. - * - * By keeping a record of the messages recevied and the average time taken to process the batch size can be - * calculated and so the delay can be adjusted to maintain that rate. - * - * Given that delays of < 10ms can be rounded up the delay is only used between messages if the _delay > 10ms * no - * messages in the batch. Otherwise the delay is used at the end of the batch. - */ - class SustainedRateAdapter implements MessageListener, Runnable - { - private SustainedTestClient _client; - private long _batchVariance = Integer.getInteger("batchVariance", 3); //no. batches to allow drifting - private long _timeVariance = TEN_MILLI_SEC * 5; // no. nanos between send and report delay (10ms) - private volatile long _delay; //in nanos - private long _sent; - private Map<String, Long> _slowClients = new HashMap<String, Long>(); - private static final long PAUSE_SLEEP = TEN_MILLI_SEC / 1000; // 10 ms - private static final long NO_CLIENT_SLEEP = 1000; // 1s - private volatile boolean NO_CLIENTS = true; - private int _delayShifting; - private final int REPORTS_WITHOUT_CHANGE = Integer.getInteger("stableReportCount", 5); - private boolean _warmedup = false; - private static final long EXPECTED_TIME_PER_BATCH = 100000L; - private int _warmUpBatches = Integer.getInteger("warmUpBatches", 10); - - - SustainedRateAdapter(SustainedTestClient client) - { - _client = client; - } - - public void onMessage(Message message) - { - if (debugLog.isDebugEnabled()) - { - debugLog.debug("SustainedRateAdapter onMessage(Message message = " + message + "): called"); - } - - try - { - String controlType = message.getStringProperty("CONTROL_TYPE"); - - // Check if the message is a test invite. - if ("UPDATE".equals(controlType)) - { - NO_CLIENTS = false; - long duration = message.getLongProperty("DURATION"); - long totalReceived = message.getLongProperty("RECEIVED"); - String client = message.getStringProperty("CLIENT_ID"); - int batchNumber = message.getIntProperty("BATCH"); - - if (debugLog.isInfoEnabled() && batchNumber % DEBUG_LOG_UPATE_INTERVAL == 0) - { - debugLog.info("Update Report: CLIENT_ID:" + client + " RECEIVED:" + totalReceived - + " Recevied BATCH:" + batchNumber + " DURATION:" + duration); - } - - recordSlow(client, totalReceived, batchNumber); - - adjustDelay(client, batchNumber, duration); - - // Warm up completes when: - // we haven't warmed up - // and the number of batches sent to each client is at least half of the required warmup batches - if (!_warmedup - && (batchNumber >= _warmUpBatches)) - { - _warmedup = true; - _warmup.countDown(); - - } - } - } - catch (JMSException e) - { - // - } - } - - CountDownLatch _warmup = new CountDownLatch(1); - - int _numBatches = 10000; - - // long[] _timings = new long[_numBatches]; - private boolean _running = true; - - - public void run() - { - log.info("Warming up"); - - doBatch(_warmUpBatches); - - try - { - //wait for warmup to complete. - _warmup.await(); - - //set delay to the average length of the batches - _delay = _totalDuration / _warmUpBatches / delays.size(); - - log.info("Warmup complete delay set : " + _delay - + " based on _totalDuration: " + _totalDuration - + " over no. batches: " + _warmUpBatches - + " with client count: " + delays.size()); - - _totalDuration = 0L; - _totalReceived = 0L; - _sent = 0L; - } - catch (InterruptedException e) - { - // - } - - - doBatch(_numBatches); - - } - - private void doBatch(int batchSize) // long[] timings, - { - TextMessage testMessage = null; - try - { - testMessage = _client.session[0].createTextMessage("start"); - - - for (int batch = 0; batch <= batchSize; batch++) -// while (_running) - { - long start = System.nanoTime(); - - testMessage.setText("start"); - testMessage.setIntProperty("BATCH", batch); - - _client.producer.send(testMessage); - _rateAdapter.sentMessage(); - - testMessage.setText("test"); - //start at 2 so start and end count as part of batch - for (int m = 2; m < _batchSize; m++) - { - _client.producer.send(testMessage); - _rateAdapter.sentMessage(); - } - - testMessage.setText("end"); - _client.producer.send(testMessage); - _rateAdapter.sentMessage(); - - long end = System.nanoTime(); - - long sendtime = end - start; - - if (debugLog.isDebugEnabled()) - { - debugLog.info("Sent batch[" + batch + "](" + _batchSize + ") in " + sendtime);//timings[batch]); - } - - if (batch % LOG_UPATE_INTERVAL == 0) - { - log.info("Sent Batch[" + batch + "](" + _batchSize + ")" + status()); - } - - _rateAdapter.sleepBatch(); - - } - } - catch (JMSException e) - { - log.error("Runner ended"); - } - } - - private String status() - { - return " TotalDuration: " + _totalDuration + " for " + delays.size() + " consumers" - + " Delay is " + _delay + " resulting in " - + ((_delay > TEN_MILLI_SEC * _batchSize) ? (_delay / _batchSize) + "/msg" : _delay + "/batch"); - } - - private void sleepBatch() - { - if (checkForSlowClients()) - {//if there werwe slow clients we have already slept so don't sleep anymore again. - return; - } - - if (!SLEEP_PER_MESSAGE) - { - //per batch sleep.. if sleep is to small to spread over the batch. - if (_delay <= TEN_MILLI_SEC * _batchSize) - { - sleepLong(_delay); - } - else - { - debugLog.info("Not sleeping _delay > ten*batch is:" + _delay); - } - } - } - - public void stop() - { - _running = false; - } - - Map<String, Long> delays = new HashMap<String, Long>(); - Long _totalReceived = 0L; - Long _totalDuration = 0L; - int _skipUpdate = 0; - - /** - * Adjust the delay for sending messages based on this update from the client - * - * @param client The client that send this update - * @param duration The time taken for the last batch of messagse - * @param batchNumber The reported batchnumber from the client - */ - private void adjustDelay(String client, int batchNumber, long duration) - { - //Retrieve the current total time taken for this client. - Long currentTime = delays.get(client); - - // Add the new duration time to this client - if (currentTime == null) - { - currentTime = duration; - } - else - { - currentTime += duration; - } - - delays.put(client, currentTime); - - long batchesSent = _sent / _batchSize; - - // ensure we don't divide by zero - if (batchesSent == 0) - { - batchesSent = 1L; - } - - _totalReceived += _batchSize; - _totalDuration += duration; - - //calculate average duration accross clients per batch - long averageDuration = _totalDuration / delays.size() / batchesSent; - - //calculate the difference between current send delay and average report delay - long diff = (duration) - averageDuration; - - if (debugLog.isInfoEnabled() && batchNumber % DEBUG_LOG_UPATE_INTERVAL == 0) - { - debugLog.info("TotalDuration:" + _totalDuration + " for " + delays.size() + " consumers." - + " on batch: " + batchesSent - + " received batch: " + batchNumber - + " Batch Duration: " + duration - + " Average: " + averageDuration - + " so diff: " + diff + " for : " + client - + " Delay is " + _delay + " resulting in " - + ((_delay > TEN_MILLI_SEC * _batchSize) - ? (_delay / _batchSize) + "/msg" : _delay + "/batch")); - } - - //if the averageDuration differs from the current by more than the specified variane then adjust delay. - if (Math.abs(diff) > _timeVariance) - { - - // if the the _delay is larger than the required duration to send report - // speed up - if (diff > TEN_MILLI_SEC) - { - _delay -= TEN_MILLI_SEC; - - if (_delay < 0) - { - _delay = 0; - debugLog.info("Reset _delay to 0"); - delayStable(); - } - else - { - delayChanged(); - } - - } - else if (diff < 0) // diff < 0 diff cannot be 0 as it is > _timeVariance - { - // the report took longer - _delay += TEN_MILLI_SEC; - delayChanged(); - } - } - else - { - delayStable(); - } - - // If we have a consumer that is behind with the batches. - if (batchesSent - batchNumber > _batchVariance) - { - debugLog.debug("Increasing _delay as sending more than receiving"); - - _delay += 2 * TEN_MILLI_SEC; - delayChanged(); - } - - - } - - /** Reset the number of iterations before we say the delay has stabilised. */ - private void delayChanged() - { - _delayShifting = REPORTS_WITHOUT_CHANGE; - } - - /** - * Record the fact that delay has stabilised If delay has stablised for REPORTS_WITHOUT_CHANGE then it will - * output Delay stabilised - */ - private void delayStable() - { - _delayShifting--; - - if (_delayShifting < 0) - { - _delayShifting = 0; - log.debug("Delay stabilised:" + _delay); - } - } - - /** - * Checks that the client has received enough messages. If the client has fallen behind then they are put in the - * _slowClients lists which will increase the delay. - * - * @param client The client identifier to check - * @param received the number of messages received by that client - * @param batchNumber - */ - private void recordSlow(String client, long received, int batchNumber) - { - if (Math.abs(batchNumber - (_sent / _batchSize)) > _batchVariance) - { - _slowClients.put(client, received); - } - else - { - _slowClients.remove(client); - } - } - - /** Incrment the number of sent messages and then sleep, if required. */ - public void sentMessage() - { - - _sent++; - - if (_delay > TEN_MILLI_SEC * _batchSize) - { - long batchDelay = _delay / _batchSize; - // less than 10ms sleep doesn't always work. - // _delay is in nano seconds -// if (batchDelay < (TEN_MILLI_SEC)) -// { -// sleep(0, (int) batchDelay); -// } -// else - { -// if (batchDelay < 30000000000L) - { - sleepLong(batchDelay); - } - } - } - else - { - if (SLEEP_PER_MESSAGE && (_delay > 0)) - { - sleepLong(_delay / _batchSize); - } - } - } - - - /** - * Check at the end of each batch and pause sending messages to allow slow clients to catch up. - * - * @return true if there were slow clients that caught up. - */ - private boolean checkForSlowClients() - { - // This will allways be true as we are running this at the end of each batchSize -// if (_sent % _batchSize == 0) - { - // Cause test to pause when we have slow - if (!_slowClients.isEmpty() || NO_CLIENTS) - { - - - while (!_slowClients.isEmpty()) - { - if (debugLog.isInfoEnabled() - && _sent / _batchSize % DEBUG_LOG_UPATE_INTERVAL == 0) - { - String clients = ""; - Iterator it = _slowClients.keySet().iterator(); - while (it.hasNext()) - { - clients += it.next(); - if (it.hasNext()) - { - clients += ", "; - } - } - debugLog.info("Pausing for slow clients:" + clients); - } - - - if (log.isDebugEnabled() - && _sent / _batchSize % LOG_UPATE_INTERVAL == 0) - { - log.debug(_slowClients.size() + " slow clients."); - } - sleep(PAUSE_SLEEP); - } - - if (NO_CLIENTS) - { - sleep(NO_CLIENT_SLEEP); - } - - debugLog.debug("Continuing"); - return true; - } - else - { - if (_sent / _batchSize % LOG_UPATE_INTERVAL == 0) - { - log.info("Total Delay :" + _delay + " " - + (_delayShifting == 0 ? "Stablised" : "Not Stablised(" + _delayShifting + ")")); - } - } - - } - - return false; - } - - /** - * Sleep normally takes micro-seconds this allows the use of a nano-second value. - * - * @param delay nanoseconds to sleep for. - */ - private void sleepLong(long delay) - { - sleep(delay / 1000000, (int) (delay % 1000000)); - } - - /** - * Sleep for the specified micro-seconds. - * @param sleep microseconds to sleep for. - */ - private void sleep(long sleep) - { - sleep(sleep, 0); - } - - /** - * Perform the sleep , swallowing any InteruptException. - * - * NOTE: If a sleep request is > 10s then reset only sleep for 5s - * - * @param milli to sleep for - * @param nano sub miliseconds to sleep for - */ - private void sleep(long milli, int nano) - { - try - { - debugLog.debug("Sleep:" + milli + ":" + nano); - if (milli > 10000) - { - - if (_delay == milli) - { - _totalDuration = _totalReceived / _batchSize * EXPECTED_TIME_PER_BATCH; - debugLog.error("Sleeping for more than 10 seconds adjusted to 5s!:" + milli / 1000 + "s. Reset _totalDuration:" + _totalDuration); - } - else - { - debugLog.error("Sleeping for more than 10 seconds adjusted to 5s!:" + milli / 1000 + "s"); - } - - milli = 5000; - } - - Thread.sleep(milli, nano); - } - catch (InterruptedException e) - { - // - } - } - - public void setClient(SustainedTestClient client) - { - _client = client; - } - } - -} - +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.sustained;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.client.AMQNoConsumersException;
+import org.apache.qpid.client.AMQNoRouteException;
+import org.apache.qpid.interop.testclient.TestClient;
+import org.apache.qpid.interop.testclient.testcases.TestCase3BasicPubSub;
+import org.apache.qpid.test.framework.TestUtils;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * Implements test case 3, basic pub/sub. Sends/received a specified number of messages to a specified route on the
+ * default topic exchange, using the specified number of receiver connections. Produces reports on the actual number of
+ * messages sent/received.
+ *
+ * <p><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Supply the name of the test case that this implements.
+ * <tr><td> Accept/Reject invites based on test parameters.
+ * <tr><td> Adapt to assigned roles.
+ * <tr><td> Send required number of test messages using pub/sub. <tr><td> Generate test reports.
+ * </table>
+ */
+public class SustainedClientTestCase extends TestCase3BasicPubSub implements ExceptionListener
+{
+ /** Used for debugging. */
+ private static final Logger log = Logger.getLogger(SustainedClientTestCase.class);
+
+ /** Used to log to the console. */
+ private static final Logger console = Logger.getLogger("SustainedTest");
+
+ /** The role to be played by the test. */
+ private Roles role;
+
+ /** The number of receiver connection to use. */
+ private int numReceivers;
+
+ /** The routing key to send them to on the default direct exchange. */
+ private Destination sendDestination;
+
+ /** The routing key to send updates to on the default direct exchange. */
+ private Destination sendUpdateDestination;
+
+ /** The connections to send/receive the test messages on. */
+ private Connection[] connection;
+
+ /** The sessions to send/receive the test messages on. */
+ private Session[] session;
+
+ /** The producer to send the test messages with. */
+ MessageProducer producer;
+
+ /** Adapter that adjusts the send rate based on the updates from clients. */
+ SustainedRateAdapter _rateAdapter;
+
+ /** */
+ int _batchSize;
+
+ private static final long TEN_MILLI_SEC = 10000000;
+ private static final int DEBUG_LOG_UPATE_INTERVAL = 10;
+ private static final int LOG_UPATE_INTERVAL = 10;
+ private static final boolean SLEEP_PER_MESSAGE = Boolean.getBoolean("sleepPerMessage");
+
+ /**
+ * Should provide the name of the test case that this class implements. The exact names are defined in the interop
+ * testing spec.
+ *
+ * @return The name of the test case that this implements.
+ */
+ public String getName()
+ {
+ log.debug("public String getName(): called");
+
+ return "Perf_SustainedPubSub";
+ }
+
+ /**
+ * Assigns the role to be played by this test case. The test parameters are fully specified in the assignment
+ * message. When this method return the test case will be ready to execute.
+ *
+ * @param role The role to be played; sender or receiver.
+ * @param assignRoleMessage The role assingment message, contains the full test parameters.
+ *
+ * @throws JMSException Any JMSException resulting from reading the message are allowed to fall through.
+ */
+ public void assignRole(Roles role, Message assignRoleMessage) throws JMSException
+ {
+ log.debug("public void assignRole(Roles role = " + role + ", Message assignRoleMessage = " + assignRoleMessage
+ + "): called");
+
+ // Take note of the role to be played.
+ this.role = role;
+
+ // Extract and retain the test parameters.
+ numReceivers = assignRoleMessage.getIntProperty("SUSTAINED_NUM_RECEIVERS");
+ _batchSize = assignRoleMessage.getIntProperty("SUSTAINED_UPDATE_INTERVAL");
+ String sendKey = assignRoleMessage.getStringProperty("SUSTAINED_KEY");
+ String sendUpdateKey = assignRoleMessage.getStringProperty("SUSTAINED_UPDATE_KEY");
+ int ackMode = assignRoleMessage.getIntProperty("ACKNOWLEDGE_MODE");
+ String clientName = assignRoleMessage.getStringProperty("CLIENT_NAME");
+
+ if (log.isDebugEnabled())
+ {
+ log.debug("numReceivers = " + numReceivers);
+ log.debug("_batchSize = " + _batchSize);
+ log.debug("ackMode = " + ackMode);
+ log.debug("sendKey = " + sendKey);
+ log.debug("sendUpdateKey = " + sendUpdateKey);
+ log.debug("role = " + role);
+ }
+
+ switch (role)
+ {
+ // Check if the sender role is being assigned, and set up a single message producer if so.
+ case SENDER:
+ console.info("Creating Sender");
+ // Create a new connection to pass the test messages on.
+ connection = new Connection[1];
+ session = new Session[1];
+
+ connection[0] = TestUtils.createConnection(TestClient.testContextProperties);
+ session[0] = connection[0].createSession(false, ackMode);
+
+ // Extract and retain the test parameters.
+ sendDestination = session[0].createTopic(sendKey);
+
+ connection[0].setExceptionListener(this);
+
+ producer = session[0].createProducer(sendDestination);
+
+ sendUpdateDestination = session[0].createTopic(sendUpdateKey);
+ MessageConsumer updateConsumer = session[0].createConsumer(sendUpdateDestination);
+
+ _rateAdapter = new SustainedRateAdapter(this);
+ updateConsumer.setMessageListener(_rateAdapter);
+
+ break;
+
+ // Otherwise the receiver role is being assigned, so set this up to listen for messages on the required number
+ // of receiver connections.
+ case RECEIVER:
+ console.info("Creating Receiver");
+ // Create the required number of receiver connections.
+ connection = new Connection[numReceivers];
+ session = new Session[numReceivers];
+
+ for (int i = 0; i < numReceivers; i++)
+ {
+ connection[i] = TestUtils.createConnection(TestClient.testContextProperties);
+ session[i] = connection[i].createSession(false, ackMode);
+
+ sendDestination = session[i].createTopic(sendKey);
+
+ sendUpdateDestination = session[i].createTopic(sendUpdateKey);
+
+ MessageConsumer consumer = session[i].createConsumer(sendDestination);
+
+ consumer.setMessageListener(new SustainedListener(clientName + "-" + i, _batchSize, session[i],
+ sendUpdateDestination));
+ }
+
+ break;
+ }
+
+ // Start all the connection dispatcher threads running.
+ for (int i = 0; i < connection.length; i++)
+ {
+ connection[i].start();
+ }
+ }
+
+ /** Performs the test case actions. */
+ public void start() throws JMSException
+ {
+ log.debug("public void start(): called");
+
+ // Check that the sender role is being performed.
+ switch (role)
+ {
+ // Check if the sender role is being assigned, and set up a single message producer if so.
+ case SENDER:
+ _rateAdapter.run();
+ break;
+ case RECEIVER:
+
+ }
+
+ // return from here when you have finished the test.. this will signal the controller and
+ }
+
+ public void terminate() throws JMSException, InterruptedException
+ {
+ if (_rateAdapter != null)
+ {
+ _rateAdapter.stop();
+ }
+ }
+
+ /**
+ * Gets a report on the actions performed by the test case in its assigned role.
+ *
+ * @param session The session to create the report message in.
+ *
+ * @return The report message.
+ *
+ * @throws JMSException Any JMSExceptions resulting from creating the report are allowed to fall through.
+ */
+ public Message getReport(Session session) throws JMSException
+ {
+ log.debug("public Message getReport(Session session): called");
+
+ // Close the test connections.
+ for (int i = 0; i < connection.length; i++)
+ {
+ connection[i].close();
+ }
+
+ Message report = session.createMessage();
+ report.setStringProperty("CONTROL_TYPE", "REPORT");
+
+ return report;
+ }
+
+ public void onException(JMSException jmsException)
+ {
+ Exception linked = jmsException.getLinkedException();
+
+ if (linked != null)
+ {
+ if (log.isDebugEnabled())
+ {
+ log.debug("Linked Exception:" + linked);
+ }
+
+ if ((linked instanceof AMQNoRouteException) || (linked instanceof AMQNoConsumersException))
+ {
+ if (log.isDebugEnabled())
+ {
+ if (linked instanceof AMQNoConsumersException)
+ {
+ log.warn("No clients currently available for message:"
+ + ((AMQNoConsumersException) linked).getUndeliveredMessage());
+ }
+ else
+ {
+ log.warn("No route for message");
+ }
+ }
+
+ // Tell the rate adapter that there are no clients ready yet
+ _rateAdapter.NO_CLIENTS = true;
+ }
+ }
+ else
+ {
+ log.warn("Exception:" + linked);
+ }
+ }
+
+ /**
+ * Inner class that listens for messages and sends a report for the time taken between receiving the 'start' and
+ * 'end' messages.
+ */
+ class SustainedListener implements MessageListener
+ {
+ /** Number of messages received */
+ private long _received = 0;
+ /** The number of messages in the batch */
+ private int _batchSize = 0;
+ /** Record of the when the 'start' messagse was sen */
+ private Long _startTime;
+ /** Message producer to use to send reports */
+ MessageProducer _updater;
+ /** Session to create the report message on */
+ Session _session;
+ /** Record of the client ID used for this SustainedListnener */
+ String _client;
+
+ /**
+ * Main Constructor
+ *
+ * @param clientname The _client id used to identify this connection.
+ * @param batchSize The number of messages that are to be sent per batch. Note: This is not used to
+ * control the interval between sending reports.
+ * @param session The session used for communication.
+ * @param sendDestination The destination that update reports should be sent to.
+ *
+ * @throws JMSException My occur if creatingthe Producer fails
+ */
+ public SustainedListener(String clientname, int batchSize, Session session, Destination sendDestination)
+ throws JMSException
+ {
+ _batchSize = batchSize;
+ _client = clientname;
+ _session = session;
+ _updater = session.createProducer(sendDestination);
+ }
+
+ public void onMessage(Message message)
+ {
+ if (log.isTraceEnabled())
+ {
+ log.trace("Message " + _received + "received in listener");
+ }
+
+ if (message instanceof TextMessage)
+ {
+ try
+ {
+ _received++;
+ if (((TextMessage) message).getText().equals("start"))
+ {
+ log.debug("Starting Batch");
+ _startTime = System.nanoTime();
+ }
+ else if (((TextMessage) message).getText().equals("end"))
+ {
+ if (_startTime != null)
+ {
+ long currentTime = System.nanoTime();
+ sendStatus(currentTime - _startTime, _received, message.getIntProperty("BATCH"));
+ log.debug("End Batch");
+ }
+ }
+ }
+ catch (JMSException e)
+ {
+ // ignore error
+ }
+ }
+
+ }
+
+ /**
+ * sendStatus creates and sends the report back to the publisher
+ *
+ * @param time taken for the the last batch
+ * @param received Total number of messages received.
+ * @param batchNumber the batch number
+ * @throws JMSException if an error occurs during the send
+ */
+ private void sendStatus(long time, long received, int batchNumber) throws JMSException
+ {
+ Message updateMessage = _session.createTextMessage("update");
+ updateMessage.setStringProperty("CLIENT_ID", ":" + _client);
+ updateMessage.setStringProperty("CONTROL_TYPE", "UPDATE");
+ updateMessage.setLongProperty("RECEIVED", received);
+ updateMessage.setIntProperty("BATCH", batchNumber);
+ updateMessage.setLongProperty("DURATION", time);
+
+ if (log.isInfoEnabled())
+ {
+ log.info("**** SENDING [" + batchNumber + "]**** " + "CLIENT_ID:" + _client + " RECEIVED:" + received
+ + " BATCH:" + batchNumber + " DURATION:" + time);
+ }
+
+ // Output on the main console.info the details of this batch
+ if ((batchNumber % 10) == 0)
+ {
+ console.info("Sending Report [" + batchNumber + "] " + "CLIENT_ID:" + _client + " RECEIVED:" + received
+ + " BATCH:" + batchNumber + " DURATION:" + time);
+ }
+
+ _updater.send(updateMessage);
+ }
+ }
+
+ /**
+ * This class is used here to adjust the _delay value which in turn is used to control the number of messages/second
+ * that are sent through the test system.
+ *
+ * By keeping a record of the messages recevied and the average time taken to process the batch size can be
+ * calculated and so the delay can be adjusted to maintain that rate.
+ *
+ * Given that delays of < 10ms can be rounded up the delay is only used between messages if the _delay > 10ms * no
+ * messages in the batch. Otherwise the delay is used at the end of the batch.
+ */
+ class SustainedRateAdapter implements MessageListener, Runnable
+ {
+ private SustainedClientTestCase _client;
+ private long _batchVariance = Integer.getInteger("batchVariance", 3); // no. batches to allow drifting
+ private long _timeVariance = TEN_MILLI_SEC * 5; // no. nanos between send and report delay (10ms)
+ private volatile long _delay; // in nanos
+ private long _sent;
+ private Map<String, Long> _slowClients = new HashMap<String, Long>();
+ private static final long PAUSE_SLEEP = TEN_MILLI_SEC / 1000; // 10 ms
+ private static final long NO_CLIENT_SLEEP = 1000; // 1s
+ private volatile boolean NO_CLIENTS = true;
+ private int _delayShifting;
+ private final int REPORTS_WITHOUT_CHANGE = Integer.getInteger("stableReportCount", 5);
+ private boolean _warmedup = false;
+ private static final long EXPECTED_TIME_PER_BATCH = 100000L;
+ private int _warmUpBatches = Integer.getInteger("warmUpBatches", 10);
+
+ SustainedRateAdapter(SustainedClientTestCase client)
+ {
+ _client = client;
+ }
+
+ public void onMessage(Message message)
+ {
+ if (log.isDebugEnabled())
+ {
+ log.debug("SustainedRateAdapter onMessage(Message message = " + message + "): called");
+ }
+
+ try
+ {
+ String controlType = message.getStringProperty("CONTROL_TYPE");
+
+ // Check if the message is a test invite.
+ if ("UPDATE".equals(controlType))
+ {
+ NO_CLIENTS = false;
+ long duration = message.getLongProperty("DURATION");
+ long totalReceived = message.getLongProperty("RECEIVED");
+ String client = message.getStringProperty("CLIENT_ID");
+ int batchNumber = message.getIntProperty("BATCH");
+
+ if (log.isInfoEnabled() && ((batchNumber % DEBUG_LOG_UPATE_INTERVAL) == 0))
+ {
+ log.info("Update Report: CLIENT_ID:" + client + " RECEIVED:" + totalReceived + " Recevied BATCH:"
+ + batchNumber + " DURATION:" + duration);
+ }
+
+ recordSlow(client, totalReceived, batchNumber);
+
+ adjustDelay(client, batchNumber, duration);
+
+ // Warm up completes when:
+ // we haven't warmed up
+ // and the number of batches sent to each client is at least half of the required warmup batches
+ if (!_warmedup && (batchNumber >= _warmUpBatches))
+ {
+ _warmedup = true;
+ _warmup.countDown();
+
+ }
+ }
+ }
+ catch (JMSException e)
+ {
+ //
+ }
+ }
+
+ CountDownLatch _warmup = new CountDownLatch(1);
+
+ int _numBatches = 10000;
+
+ // long[] _timings = new long[_numBatches];
+ private boolean _running = true;
+
+ public void run()
+ {
+ console.info("Warming up");
+
+ doBatch(_warmUpBatches);
+
+ try
+ {
+ // wait for warmup to complete.
+ _warmup.await();
+
+ // set delay to the average length of the batches
+ _delay = _totalDuration / _warmUpBatches / delays.size();
+
+ console.info("Warmup complete delay set : " + _delay + " based on _totalDuration: " + _totalDuration
+ + " over no. batches: " + _warmUpBatches + " with client count: " + delays.size());
+
+ _totalDuration = 0L;
+ _totalReceived = 0L;
+ _sent = 0L;
+ }
+ catch (InterruptedException e)
+ {
+ //
+ }
+
+ doBatch(_numBatches);
+
+ }
+
+ private void doBatch(int batchSize) // long[] timings,
+ {
+ TextMessage testMessage = null;
+ try
+ {
+ testMessage = _client.session[0].createTextMessage("start");
+
+ for (int batch = 0; batch <= batchSize; batch++)
+ // while (_running)
+ {
+ long start = System.nanoTime();
+
+ testMessage.setText("start");
+ testMessage.setIntProperty("BATCH", batch);
+
+ _client.producer.send(testMessage);
+ _rateAdapter.sentMessage();
+
+ testMessage.setText("test");
+ // start at 2 so start and end count as part of batch
+ for (int m = 2; m < _batchSize; m++)
+ {
+ _client.producer.send(testMessage);
+ _rateAdapter.sentMessage();
+ }
+
+ testMessage.setText("end");
+ _client.producer.send(testMessage);
+ _rateAdapter.sentMessage();
+
+ long end = System.nanoTime();
+
+ long sendtime = end - start;
+
+ if (log.isDebugEnabled())
+ {
+ log.info("Sent batch[" + batch + "](" + _batchSize + ") in " + sendtime); // timings[batch]);
+ }
+
+ if ((batch % LOG_UPATE_INTERVAL) == 0)
+ {
+ console.info("Sent Batch[" + batch + "](" + _batchSize + ")" + status());
+ }
+
+ _rateAdapter.sleepBatch();
+
+ }
+ }
+ catch (JMSException e)
+ {
+ console.error("Runner ended");
+ }
+ }
+
+ private String status()
+ {
+ return " TotalDuration: " + _totalDuration + " for " + delays.size() + " consumers" + " Delay is " + _delay
+ + " resulting in "
+ + ((_delay > (TEN_MILLI_SEC * _batchSize)) ? ((_delay / _batchSize) + "/msg") : (_delay + "/batch"));
+ }
+
+ private void sleepBatch()
+ {
+ if (checkForSlowClients())
+ { // if there werwe slow clients we have already slept so don't sleep anymore again.
+ return;
+ }
+
+ if (!SLEEP_PER_MESSAGE)
+ {
+ // per batch sleep.. if sleep is to small to spread over the batch.
+ if (_delay <= (TEN_MILLI_SEC * _batchSize))
+ {
+ sleepLong(_delay);
+ }
+ else
+ {
+ log.info("Not sleeping _delay > ten*batch is:" + _delay);
+ }
+ }
+ }
+
+ public void stop()
+ {
+ _running = false;
+ }
+
+ Map<String, Long> delays = new HashMap<String, Long>();
+ Long _totalReceived = 0L;
+ Long _totalDuration = 0L;
+ int _skipUpdate = 0;
+
+ /**
+ * Adjust the delay for sending messages based on this update from the client
+ *
+ * @param client The client that send this update
+ * @param duration The time taken for the last batch of messagse
+ * @param batchNumber The reported batchnumber from the client
+ */
+ private void adjustDelay(String client, int batchNumber, long duration)
+ {
+ // Retrieve the current total time taken for this client.
+ Long currentTime = delays.get(client);
+
+ // Add the new duration time to this client
+ if (currentTime == null)
+ {
+ currentTime = duration;
+ }
+ else
+ {
+ currentTime += duration;
+ }
+
+ delays.put(client, currentTime);
+
+ long batchesSent = _sent / _batchSize;
+
+ // ensure we don't divide by zero
+ if (batchesSent == 0)
+ {
+ batchesSent = 1L;
+ }
+
+ _totalReceived += _batchSize;
+ _totalDuration += duration;
+
+ // calculate average duration accross clients per batch
+ long averageDuration = _totalDuration / delays.size() / batchesSent;
+
+ // calculate the difference between current send delay and average report delay
+ long diff = (duration) - averageDuration;
+
+ if (log.isInfoEnabled() && ((batchNumber % DEBUG_LOG_UPATE_INTERVAL) == 0))
+ {
+ log.info("TotalDuration:" + _totalDuration + " for " + delays.size() + " consumers." + " on batch: "
+ + batchesSent + " received batch: " + batchNumber + " Batch Duration: " + duration + " Average: "
+ + averageDuration + " so diff: " + diff + " for : " + client + " Delay is " + _delay + " resulting in "
+ + ((_delay > (TEN_MILLI_SEC * _batchSize)) ? ((_delay / _batchSize) + "/msg") : (_delay + "/batch")));
+ }
+
+ // if the averageDuration differs from the current by more than the specified variane then adjust delay.
+ if (Math.abs(diff) > _timeVariance)
+ {
+
+ // if the the _delay is larger than the required duration to send report
+ // speed up
+ if (diff > TEN_MILLI_SEC)
+ {
+ _delay -= TEN_MILLI_SEC;
+
+ if (_delay < 0)
+ {
+ _delay = 0;
+ log.info("Reset _delay to 0");
+ delayStable();
+ }
+ else
+ {
+ delayChanged();
+ }
+
+ }
+ else if (diff < 0) // diff < 0 diff cannot be 0 as it is > _timeVariance
+ {
+ // the report took longer
+ _delay += TEN_MILLI_SEC;
+ delayChanged();
+ }
+ }
+ else
+ {
+ delayStable();
+ }
+
+ // If we have a consumer that is behind with the batches.
+ if ((batchesSent - batchNumber) > _batchVariance)
+ {
+ log.debug("Increasing _delay as sending more than receiving");
+
+ _delay += 2 * TEN_MILLI_SEC;
+ delayChanged();
+ }
+
+ }
+
+ /** Reset the number of iterations before we say the delay has stabilised. */
+ private void delayChanged()
+ {
+ _delayShifting = REPORTS_WITHOUT_CHANGE;
+ }
+
+ /**
+ * Record the fact that delay has stabilised If delay has stablised for REPORTS_WITHOUT_CHANGE then it will
+ * output Delay stabilised
+ */
+ private void delayStable()
+ {
+ _delayShifting--;
+
+ if (_delayShifting < 0)
+ {
+ _delayShifting = 0;
+ console.debug("Delay stabilised:" + _delay);
+ }
+ }
+
+ /**
+ * Checks that the client has received enough messages. If the client has fallen behind then they are put in the
+ * _slowClients lists which will increase the delay.
+ *
+ * @param client The client identifier to check
+ * @param received the number of messages received by that client
+ * @param batchNumber
+ */
+ private void recordSlow(String client, long received, int batchNumber)
+ {
+ if (Math.abs(batchNumber - (_sent / _batchSize)) > _batchVariance)
+ {
+ _slowClients.put(client, received);
+ }
+ else
+ {
+ _slowClients.remove(client);
+ }
+ }
+
+ /** Incrment the number of sent messages and then sleep, if required. */
+ public void sentMessage()
+ {
+
+ _sent++;
+
+ if (_delay > (TEN_MILLI_SEC * _batchSize))
+ {
+ long batchDelay = _delay / _batchSize;
+ // less than 10ms sleep doesn't always work.
+ // _delay is in nano seconds
+ // if (batchDelay < (TEN_MILLI_SEC))
+ // {
+ // sleep(0, (int) batchDelay);
+ // }
+ // else
+ {
+ // if (batchDelay < 30000000000L)
+ {
+ sleepLong(batchDelay);
+ }
+ }
+ }
+ else
+ {
+ if (SLEEP_PER_MESSAGE && (_delay > 0))
+ {
+ sleepLong(_delay / _batchSize);
+ }
+ }
+ }
+
+ /**
+ * Check at the end of each batch and pause sending messages to allow slow clients to catch up.
+ *
+ * @return true if there were slow clients that caught up.
+ */
+ private boolean checkForSlowClients()
+ {
+ // This will allways be true as we are running this at the end of each batchSize
+ // if (_sent % _batchSize == 0)
+ {
+ // Cause test to pause when we have slow
+ if (!_slowClients.isEmpty() || NO_CLIENTS)
+ {
+
+ while (!_slowClients.isEmpty())
+ {
+ if (log.isInfoEnabled() && ((_sent / _batchSize % DEBUG_LOG_UPATE_INTERVAL) == 0))
+ {
+ String clients = "";
+ Iterator it = _slowClients.keySet().iterator();
+ while (it.hasNext())
+ {
+ clients += it.next();
+ if (it.hasNext())
+ {
+ clients += ", ";
+ }
+ }
+
+ log.info("Pausing for slow clients:" + clients);
+ }
+
+ if (console.isDebugEnabled() && ((_sent / _batchSize % LOG_UPATE_INTERVAL) == 0))
+ {
+ console.debug(_slowClients.size() + " slow clients.");
+ }
+
+ sleep(PAUSE_SLEEP);
+ }
+
+ if (NO_CLIENTS)
+ {
+ sleep(NO_CLIENT_SLEEP);
+ }
+
+ log.debug("Continuing");
+
+ return true;
+ }
+ else
+ {
+ if ((_sent / _batchSize % LOG_UPATE_INTERVAL) == 0)
+ {
+ console.info("Total Delay :" + _delay + " "
+ + ((_delayShifting == 0) ? "Stablised" : ("Not Stablised(" + _delayShifting + ")")));
+ }
+ }
+
+ }
+
+ return false;
+ }
+
+ /**
+ * Sleep normally takes micro-seconds this allows the use of a nano-second value.
+ *
+ * @param delay nanoseconds to sleep for.
+ */
+ private void sleepLong(long delay)
+ {
+ sleep(delay / 1000000, (int) (delay % 1000000));
+ }
+
+ /**
+ * Sleep for the specified micro-seconds.
+ * @param sleep microseconds to sleep for.
+ */
+ private void sleep(long sleep)
+ {
+ sleep(sleep, 0);
+ }
+
+ /**
+ * Perform the sleep , swallowing any InteruptException.
+ *
+ * NOTE: If a sleep request is > 10s then reset only sleep for 5s
+ *
+ * @param milli to sleep for
+ * @param nano sub miliseconds to sleep for
+ */
+ private void sleep(long milli, int nano)
+ {
+ try
+ {
+ log.debug("Sleep:" + milli + ":" + nano);
+ if (milli > 10000)
+ {
+
+ if (_delay == milli)
+ {
+ _totalDuration = _totalReceived / _batchSize * EXPECTED_TIME_PER_BATCH;
+ log.error("Sleeping for more than 10 seconds adjusted to 5s!:" + (milli / 1000)
+ + "s. Reset _totalDuration:" + _totalDuration);
+ }
+ else
+ {
+ log.error("Sleeping for more than 10 seconds adjusted to 5s!:" + (milli / 1000) + "s");
+ }
+
+ milli = 5000;
+ }
+
+ Thread.sleep(milli, nano);
+ }
+ catch (InterruptedException e)
+ {
+ //
+ }
+ }
+
+ public void setClient(SustainedClientTestCase client)
+ {
+ _client = client;
+ }
+ }
+
+}
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCase.java b/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCase.java new file mode 100644 index 0000000000..3dd1326d80 --- /dev/null +++ b/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCase.java @@ -0,0 +1,125 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.sustained;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.interop.coordinator.DropInTest;
+import org.apache.qpid.interop.coordinator.TestClientDetails;
+import org.apache.qpid.interop.coordinator.FanOutTestCase;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * SustainedTestCase is a {@link FanOutTestCase} that runs the "Perf_SustainedPubSub" test case. This consists of one
+ * test client sending, and several receiving, and attempts to find the highest rate at which messages can be broadcast
+ * to the receivers. It is also a {@link DropInTest} to which more test clients may be added during a test run.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td>
+ * </table>
+ */
+public class SustainedTestCase extends FanOutTestCase implements DropInTest
+{
+ /** Used for debugging. */
+ Logger log = Logger.getLogger(SustainedTestCase.class);
+
+ /** Holds the root name of the topic on which to send the test messages. */
+ private static final String SUSTAINED_KEY = "Perf_SustainedPubSub";
+
+ /**
+ * Creates a new coordinating test case with the specified name.
+ *
+ * @param name The test case name.
+ */
+ public SustainedTestCase(String name)
+ {
+ super(name);
+ }
+
+ /**
+ * Performs a single test run of the sustained test.
+ *
+ * @throws Exception Any exceptions are allowed to fall through and fail the test.
+ */
+ public void testBasicPubSub() throws Exception
+ {
+ log.debug("public void testSinglePubSubCycle(): called");
+
+ Map<String, Object> testConfig = new HashMap<String, Object>();
+ testConfig.put("TEST_NAME", "Perf_SustainedPubSub");
+ testConfig.put("SUSTAINED_KEY", SUSTAINED_KEY);
+ testConfig.put("SUSTAINED_NUM_RECEIVERS", Integer.getInteger("numReceives", 2));
+ testConfig.put("SUSTAINED_UPDATE_INTERVAL", Integer.getInteger("batchSize", 1000));
+ testConfig.put("SUSTAINED_UPDATE_KEY", SUSTAINED_KEY + ".UPDATE");
+ testConfig.put("ACKNOWLEDGE_MODE", Integer.getInteger("ackMode", AMQSession.AUTO_ACKNOWLEDGE));
+
+ log.info("Created Config: " + testConfig.entrySet().toArray());
+
+ sequenceTest(testConfig);
+ }
+
+ /**
+ * Accepts a late joining client into this test case. The client will be enlisted with a control message
+ * with the 'CONTROL_TYPE' field set to the value 'LATEJOIN'. It should also provide values for the fields:
+ *
+ * <p/><table>
+ * <tr><td> CLIENT_NAME <td> A unique name for the new client.
+ * <tr><td> CLIENT_PRIVATE_CONTROL_KEY <td> The key for the route on which the client receives its control messages.
+ * </table>
+ *
+ * @param message The late joiners join message.
+ *
+ * @throws JMSException Any JMS Exception are allowed to fall through, indicating that the join failed.
+ */
+ public void lateJoin(Message message) throws JMSException
+ {
+ // Extract the joining clients details from its join request message.
+ TestClientDetails clientDetails = new TestClientDetails();
+ clientDetails.clientName = message.getStringProperty("CLIENT_NAME");
+ clientDetails.privateControlKey = message.getStringProperty("CLIENT_PRIVATE_CONTROL_KEY");
+
+ // Register the joining client, but do block for confirmation as cannot do a synchronous receiver during this
+ // method call, as it may have been called from an 'onMessage' method.
+ assignReceiverRole(clientDetails, new HashMap<String, Object>(), false);
+ }
+
+ /**
+ * Should provide a translation from the junit method name of a test to its test case name as known to the test
+ * clients that will run the test. The purpose of this is to convert the JUnit method name into the correct test
+ * case name to place into the test invite. For example the method "testP2P" might map onto the interop test case
+ * name "TC2_BasicP2P".
+ *
+ * @param methodName The name of the JUnit test method.
+ *
+ * @return The name of the corresponding interop test case.
+ */
+ public String getTestCaseNameForTestMethod(String methodName)
+ {
+ return "Perf_SustainedPubSub";
+ }
+}
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCoordinator.java b/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCoordinator.java deleted file mode 100644 index 0075e45a8c..0000000000 --- a/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCoordinator.java +++ /dev/null @@ -1,222 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - * - */ -package org.apache.qpid.sustained; - -import org.apache.log4j.Logger; -import org.apache.qpid.client.AMQSession; -import org.apache.qpid.interop.coordinator.ListeningCoordinatorTest; -import org.apache.qpid.interop.coordinator.TestClientDetails; -import org.apache.qpid.interop.coordinator.testcases.CoordinatingTestCase3BasicPubSub; -import org.apache.qpid.util.ConversationFactory; - -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.Session; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; - -public class SustainedTestCoordinator extends CoordinatingTestCase3BasicPubSub implements ListeningCoordinatorTest -{ - /** Used for debugging. */ - private static final Logger log = Logger.getLogger(SustainedTestCoordinator.class); - private List<TestClientDetails> _receivers; - private static final String SUSTAINED_KEY = "Perf_SustainedPubSub"; - Map<String, Object> _testProperties; - - /** - * Creates a new coordinating test case with the specified name. - * - * @param name The test case name. - */ - public SustainedTestCoordinator(String name) - { - super(name); - _receivers = new LinkedList(); - } - - /** - * Adds a receiver to this test. - * - * @param receiver The contact details of the sending client in the test. - */ - public void setReceiver(TestClientDetails receiver) - { - _receivers.add(receiver); - } - - - /** - * Performs the a single test run - * - * @throws Exception if there was a problem running the test. - */ - public void testBasicPubSub() throws Exception - { - log.debug("public void testSinglePubSubCycle(): called"); - - Map<String, Object> testConfig = new HashMap<String, Object>(); - testConfig.put("TEST_NAME", "Perf_SustainedPubSub"); - testConfig.put("SUSTAINED_KEY", SUSTAINED_KEY); - testConfig.put("SUSTAINED_NUM_RECEIVERS", Integer.getInteger("numReceives", 2)); - testConfig.put("SUSTAINED_UPDATE_INTERVAL", Integer.getInteger("batchSize", 1000)); - testConfig.put("SUSTAINED_UPDATE_KEY", SUSTAINED_KEY + ".UPDATE"); - testConfig.put("ACKNOWLEDGE_MODE", Integer.getInteger("ackMode", AMQSession.AUTO_ACKNOWLEDGE)); - - log.info("Created Config: " + testConfig.entrySet().toArray()); - - sequenceTest(testConfig); - } - - /** - * Holds a test coordinating conversation with the test clients. This is the basic implementation of the inner loop - * of Use Case 5. It consists of assigning the test roles, begining the test and gathering the test reports from the - * participants. - * - * @param testProperties The test case definition. - * - * @return The test results from the senders and receivers. - * - * @throws javax.jms.JMSException All underlying JMSExceptions are allowed to fall through. - */ - protected Message[] sequenceTest(Map<String, Object> testProperties) throws JMSException - { - log.debug("protected Message[] sequenceTest(Object... testProperties = " + testProperties + "): called"); - - Session session = conversationFactory.getSession(); - Destination senderControlTopic = session.createTopic(sender.privateControlKey); - - ConversationFactory.Conversation senderConversation = conversationFactory.startConversation(); - - // Assign the sender role to the sending test client. - Message assignSender = conversationFactory.getSession().createMessage(); - setPropertiesOnMessage(assignSender, testProperties); - assignSender.setStringProperty("CONTROL_TYPE", "ASSIGN_ROLE"); - assignSender.setStringProperty("ROLE", "SENDER"); - assignSender.setStringProperty("CLIENT_NAME", "Sustained_SENDER"); - - senderConversation.send(senderControlTopic, assignSender); - - //Assign and wait for the receiver ckuebts to be ready. - _testProperties = testProperties; - - // Wait for the senders to confirm their roles. - senderConversation.receive(); - - assignReceivers(); - - // Start the test. - Message start = session.createMessage(); - start.setStringProperty("CONTROL_TYPE", "START"); - - senderConversation.send(senderControlTopic, start); - - // Wait for the test sender to return its report. - Message senderReport = senderConversation.receive(); - - try - { - Thread.sleep(500); - } - catch (InterruptedException e) - { - } - - // Ask the receiver for its report. - Message statusRequest = session.createMessage(); - statusRequest.setStringProperty("CONTROL_TYPE", "STATUS_REQUEST"); - - - return new Message[]{senderReport}; - } - - private void assignReceivers() - { - for (TestClientDetails receiver : _receivers) - { - registerReceiver(receiver); - } - } - - private void registerReceiver(TestClientDetails receiver) - { - log.info("registerReceiver called for receiver:" + receiver); - try - { - Session session = conversationFactory.getSession(); - Destination receiverControlTopic = session.createTopic(receiver.privateControlKey); - ConversationFactory.Conversation receiverConversation = conversationFactory.startConversation(); - // Assign the receiver role the receiving client. - Message assignReceiver = session.createMessage(); - setPropertiesOnMessage(assignReceiver, _testProperties); - assignReceiver.setStringProperty("CONTROL_TYPE", "ASSIGN_ROLE"); - assignReceiver.setStringProperty("ROLE", "RECEIVER"); - assignReceiver.setStringProperty("CLIENT_NAME", "Sustained_RECEIVER_" + receiver.clientName); - - receiverConversation.send(receiverControlTopic, assignReceiver); - - //Don't wait for receiver to be ready.... we can't this is being done in - // the dispatcher thread, and most likely the acceptance message we - // want is sitting in the Dispatcher._queue waiting its turn for being - // dispatched so if we block here we won't can't get the message. - // So assume consumer is ready for action. - //receiverConversation.receive(); - } - catch (JMSException e) - { - log.warn("Unable to assign receiver:" + receiver + ". Due to:" + e.getMessage()); - } - } - - public void latejoin(Message message) - { - try - { - - TestClientDetails clientDetails = new TestClientDetails(); - clientDetails.clientName = message.getStringProperty("CLIENT_NAME"); - clientDetails.privateControlKey = message.getStringProperty("CLIENT_PRIVATE_CONTROL_KEY"); - - - registerReceiver(clientDetails); - } - catch (JMSException e) - { - //swallow - } - } - - /** - * Should provide a translation from the junit method name of a test to its test case name as defined in the interop - * testing specification. For example the method "testP2P" might map onto the interop test case name - * "TC2_BasicP2P". - * - * @param methodName The name of the JUnit test method. - * - * @return The name of the corresponding interop test case. - */ - public String getTestCaseNameForTestMethod(String methodName) - { - return "Perf_SustainedPubSub"; - } -} diff --git a/java/integrationtests/src/main/java/org/apache/qpid/sustained/TestClient.java b/java/integrationtests/src/main/java/org/apache/qpid/sustained/TestClient.java deleted file mode 100644 index 44fc090410..0000000000 --- a/java/integrationtests/src/main/java/org/apache/qpid/sustained/TestClient.java +++ /dev/null @@ -1,157 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - * - */ -package org.apache.qpid.sustained; - -import org.apache.log4j.Logger; -import org.apache.qpid.interop.testclient.InteropClientTestCase; -import org.apache.qpid.util.CommandLineParser; - -import javax.jms.JMSException; -import javax.jms.Message; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Properties; - -public class TestClient extends org.apache.qpid.interop.testclient.TestClient -{ - private static Logger log = Logger.getLogger(TestClient.class); - - /** - * Creates a new interop test client, listenting to the specified broker and virtual host, with the specified client - * identifying name. - * - * @param brokerUrl The url of the broker to connect to. - * @param virtualHost The virtual host to conect to. - * @param clientName The client name to use. - */ - public TestClient(String brokerUrl, String virtualHost, String clientName) - { - super(brokerUrl, virtualHost, clientName); - } - - /** - * The entry point for the interop test coordinator. This client accepts the following command line arguments: - * - * <p/><table> <tr><td> -b <td> The broker URL. <td> Optional. <tr><td> -h <td> The virtual - * host. <td> Optional. <tr><td> -n <td> The test client name. <td> Optional. <tr><td> name=value <td> - * Trailing argument define name/value pairs. Added to system properties. <td> Optional. </table> - * - * @param args The command line arguments. - */ - public static void main(String[] args) - { - // Use the command line parser to evaluate the command line. - CommandLineParser commandLine = - new CommandLineParser( - new String[][] - { - {"b", "The broker URL.", "broker", "false"}, - {"h", "The virtual host to use.", "virtual host", "false"}, - {"n", "The test client name.", "name", "false"}, - {"j", "Join this test client to running test.", "join", ""} - }); - - // Capture the command line arguments or display errors and correct usage and then exit. - Properties options = null; - - try - { - options = commandLine.parseCommandLine(args); - } - catch (IllegalArgumentException e) - { - System.out.println(commandLine.getErrors()); - System.out.println(commandLine.getUsage()); - System.exit(1); - } - - // Extract the command line options. - String brokerUrl = options.getProperty("b"); - String virtualHost = options.getProperty("h"); - String clientName = options.getProperty("n"); - String join = options.getProperty("j"); - - // Add all the trailing command line options (name=value pairs) to system properties. Tests may pick up - // overridden values from there. - commandLine.addCommandLineToSysProperties(); - - // Create a test client and start it running. - TestClient client = new TestClient(brokerUrl, virtualHost, (clientName == null) ? CLIENT_NAME : clientName); - - // Use a class path scanner to find all the interop test case implementations. - Collection<Class<? extends InteropClientTestCase>> testCaseClasses = - new ArrayList<Class<? extends InteropClientTestCase>>(); - // ClasspathScanner.getMatches(InteropClientTestCase.class, "^TestCase.*", true); - // Hard code the test classes till the classpath scanner is fixed. - Collections.addAll(testCaseClasses, - SustainedTestClient.class); - - - try - { - client.start(testCaseClasses, join); - } - catch (Exception e) - { - log.error("The test client was unable to start.", e); - System.exit(1); - } - } - - protected void start(Collection<Class<? extends InteropClientTestCase>> testCaseClasses, String join) throws JMSException, ClassNotFoundException - { - super.start(testCaseClasses); - log.debug("private void start(): called"); - - if (join != null && !join.equals("")) - { - Message latejoin = session.createMessage(); - - try - { - Object test = Class.forName(join).newInstance(); - if (test instanceof InteropClientTestCase) - { - currentTestCase = (InteropClientTestCase) test; - } - else - { - throw new RuntimeException("Requested to join class '" + join + "' but this is not a InteropClientTestCase."); - } - - latejoin.setStringProperty("CONTROL_TYPE", "LATEJOIN"); - latejoin.setStringProperty("CLIENT_NAME", clientName); - latejoin.setStringProperty("CLIENT_PRIVATE_CONTROL_KEY", "iop.control." + clientName); - producer.send(session.createTopic("iop.control.test." + currentTestCase.getName()), latejoin); - } - catch (InstantiationException e) - { - log.warn("Unable to request latejoining of test:" + currentTestCase); - } - catch (IllegalAccessException e) - { - log.warn("Unable to request latejoining of test:" + currentTestCase); - } - } - } - -} diff --git a/java/integrationtests/src/main/java/org/apache/qpid/sustained/TestCoordinator.java b/java/integrationtests/src/main/java/org/apache/qpid/sustained/TestCoordinator.java deleted file mode 100644 index 7e12fe39fb..0000000000 --- a/java/integrationtests/src/main/java/org/apache/qpid/sustained/TestCoordinator.java +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - * - */ -package org.apache.qpid.sustained; - -import org.apache.qpid.interop.coordinator.Coordinator; -import org.apache.qpid.interop.coordinator.ListeningTestDecorator; -import org.apache.qpid.interop.coordinator.TestClientDetails; -import org.apache.qpid.util.CommandLineParser; -import org.apache.qpid.util.ConversationFactory; -import org.apache.log4j.Logger; - -import java.util.Properties; -import java.util.Set; - -import junit.framework.TestResult; -import uk.co.thebadgerset.junit.extensions.WrappedSuiteTestDecorator; - -import javax.jms.Connection; - -public class TestCoordinator extends Coordinator -{ - - private static final Logger log = Logger.getLogger(TestCoordinator.class); - - /** - * Creates an interop test coordinator on the specified broker and virtual host. - * - * @param brokerUrl The URL of the broker to connect to. - * @param virtualHost The virtual host to run all tests on. Optional, may be <tt>null</tt>. - */ - TestCoordinator(String brokerUrl, String virtualHost) - { - super(brokerUrl, virtualHost); - } - - protected WrappedSuiteTestDecorator newTestDecorator(WrappedSuiteTestDecorator targetTest, Set<TestClientDetails> enlistedClients, ConversationFactory conversationFactory, Connection connection) - { - return new ListeningTestDecorator(targetTest, enlistedClients, conversationFactory, connection); - } - - - /** - * The entry point for the interop test coordinator. This client accepts the following command line arguments: - * - * <p/><table> <tr><td> -b <td> The broker URL. <td> Mandatory. <tr><td> -h <td> The virtual host. - * <td> Optional. <tr><td> name=value <td> Trailing argument define name/value pairs. Added to system properties. - * <td> Optional. </table> - * - * @param args The command line arguments. - */ - public static void main(String[] args) - { - try - { - // Use the command line parser to evaluate the command line with standard handling behaviour (print errors - // and usage then exist if there are errors). - Properties options = - CommandLineParser.processCommandLine(args, - new CommandLineParser( - new String[][] - { - {"b", "The broker URL.", "broker", "false"}, - {"h", "The virtual host to use.", "virtual host", "false"}, - {"o", "The name of the directory to output test timings to.", "dir", "false"} - })); - - // Extract the command line options. - String brokerUrl = options.getProperty("b"); - String virtualHost = options.getProperty("h"); - String reportDir = options.getProperty("o"); - reportDir = (reportDir == null) ? "." : reportDir; - - - String[] testClassNames = {SustainedTestCoordinator.class.getName()}; - - // Create a coordinator and begin its test procedure. - Coordinator coordinator = new TestCoordinator(brokerUrl, virtualHost); - - coordinator.setReportDir(reportDir); - - TestResult testResult = coordinator.start(testClassNames); - - if (testResult.failureCount() > 0) - { - System.exit(FAILURE_EXIT); - } - else - { - System.exit(SUCCESS_EXIT); - } - } - catch (Exception e) - { - System.err.println(e.getMessage()); - log.error("Top level handler caught execption.", e); - System.exit(EXCEPTION_EXIT); - } - } -} |