diff options
author | Martin Ritchie <ritchiem@apache.org> | 2007-06-04 09:47:53 +0000 |
---|---|---|
committer | Martin Ritchie <ritchiem@apache.org> | 2007-06-04 09:47:53 +0000 |
commit | b9a973e9bf930d188bfc805e39b1ff7353ae7a2b (patch) | |
tree | 9841cf8206859f453c8f49b693c8f892cd84c525 | |
parent | 3b5d4734b777b54b52ce2710f404143aca8c5c6e (diff) | |
download | qpid-python-b9a973e9bf930d188bfc805e39b1ff7353ae7a2b.tar.gz |
Addition of a sustained test client. This is currently setup for running a pub/sub test.
The test allows for multiple clients to connect and participate in testing the broker throughput.
A single producer continually sends messages to a topic which the clients then send batched results back about.
The producer uses the timings in the reports to update the rate at which it sends messages. Ideally reaching a steady state where all messages produced are received by everyone within a specified time frame.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@544109 13f79535-47bb-0310-9956-ffa450edef68
11 files changed, 1386 insertions, 85 deletions
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/CoordinatingTestCase.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/CoordinatingTestCase.java index 3003c00ca5..31de84e630 100644 --- a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/CoordinatingTestCase.java +++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/CoordinatingTestCase.java @@ -21,10 +21,7 @@ */
package org.apache.qpid.interop.coordinator;
-import java.util.Collection;
import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.LinkedBlockingQueue;
import javax.jms.*;
@@ -74,13 +71,13 @@ public abstract class CoordinatingTestCase extends TestCase private static final Logger log = Logger.getLogger(CoordinatingTestCase.class);
/** Holds the contact details for the sending test client. */
- TestClientDetails sender;
+ protected TestClientDetails sender;
/** Holds the contact details for the receving test client. */
- TestClientDetails receiver;
+ protected TestClientDetails receiver;
/** Holds the conversation factory over which to coordinate the test. */
- ConversationFactory conversationFactory;
+ protected ConversationFactory conversationFactory;
/**
* Creates a new coordinating test case with the specified name.
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/Coordinator.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/Coordinator.java index 2f9937cae9..0eb6be3a91 100644 --- a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/Coordinator.java +++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/Coordinator.java @@ -65,28 +65,28 @@ public class Coordinator extends TKTestRunner public static final String DEFAULT_CONNECTION_PROPS_RESOURCE = "org/apache/qpid/interop/connection.properties";
/** Holds the URL of the broker to coordinate the tests on. */
- String brokerUrl;
+ protected String brokerUrl;
/** Holds the virtual host to coordinate the tests on. If <tt>null</tt>, then the default virtual host is used. */
- String virtualHost;
+ protected String virtualHost;
/** Holds the list of all clients that enlisted, when the compulsory invite was issued. */
- Set<TestClientDetails> enlistedClients = new HashSet<TestClientDetails>();
+ protected Set<TestClientDetails> enlistedClients = new HashSet<TestClientDetails>();
/** Holds the conversation helper for the control conversation. */
- private ConversationFactory conversationFactory;
+ protected ConversationFactory conversationFactory;
/** Holds the connection that the coordinating messages are sent over. */
- private Connection connection;
+ protected Connection connection;
/**
* Holds the name of the class of the test currently being run. Ideally passed into the {@link #createTestResult}
* method, but as the signature is already fixed for this, the current value gets pushed here as a member variable.
*/
- private String currentTestClassName;
+ protected String currentTestClassName;
/** Holds the path of the directory to output test results too, if one is defined. */
- private static String reportDir;
+ protected static String _reportDir;
/**
* Creates an interop test coordinator on the specified broker and virtual host.
@@ -94,7 +94,7 @@ public class Coordinator extends TKTestRunner * @param brokerUrl The URL of the broker to connect to.
* @param virtualHost The virtual host to run all tests on. Optional, may be <tt>null</tt>.
*/
- Coordinator(String brokerUrl, String virtualHost)
+ public Coordinator(String brokerUrl, String virtualHost)
{
log.debug("Coordinator(String brokerUrl = " + brokerUrl + ", String virtualHost = " + virtualHost + "): called");
@@ -121,38 +121,36 @@ public class Coordinator extends TKTestRunner // Use the command line parser to evaluate the command line with standard handling behaviour (print errors
// and usage then exist if there are errors).
Properties options =
- CommandLineParser.processCommandLine(args,
- new CommandLineParser(
- new String[][]
- {
- { "b", "The broker URL.", "broker", "false" },
- { "h", "The virtual host to use.", "virtual host", "false" },
- { "o", "The name of the directory to output test timings to.", "dir", "false" }
- }));
+ CommandLineParser.processCommandLine(args,
+ new CommandLineParser(
+ new String[][]
+ {
+ {"b", "The broker URL.", "broker", "false"},
+ {"h", "The virtual host to use.", "virtual host", "false"},
+ {"o", "The name of the directory to output test timings to.", "dir", "false"}
+ }));
// Extract the command line options.
String brokerUrl = options.getProperty("b");
String virtualHost = options.getProperty("h");
- reportDir = options.getProperty("o");
- reportDir = (reportDir == null) ? "." : reportDir;
+ _reportDir = options.getProperty("o");
+ _reportDir = (_reportDir == null) ? "." : _reportDir;
// Scan for available test cases using a classpath scanner.
Collection<Class<? extends CoordinatingTestCase>> testCaseClasses =
- new ArrayList<Class<? extends CoordinatingTestCase>>();
+ new ArrayList<Class<? extends CoordinatingTestCase>>();
// ClasspathScanner.getMatches(CoordinatingTestCase.class, "^Test.*", true);
// Hard code the test classes till the classpath scanner is fixed.
Collections.addAll(testCaseClasses,
- new Class[]
- {
- CoordinatingTestCase1DummyRun.class, CoordinatingTestCase2BasicP2P.class,
- CoordinatingTestCase3BasicPubSub.class
- });
+ CoordinatingTestCase1DummyRun.class,
+ CoordinatingTestCase2BasicP2P.class,
+ CoordinatingTestCase3BasicPubSub.class);
// Check that some test classes were actually found.
- if ((testCaseClasses == null) || testCaseClasses.isEmpty())
+ if (testCaseClasses.isEmpty())
{
throw new RuntimeException(
- "No test classes implementing CoordinatingTestCase were found on the class path.");
+ "No test classes implementing CoordinatingTestCase were found on the class path.");
}
int i = 0;
@@ -199,7 +197,7 @@ public class Coordinator extends TKTestRunner public TestResult start(String[] testClassNames) throws Exception
{
log.debug("public TestResult start(String[] testClassNames = " + PrettyPrintingUtils.printArray(testClassNames)
- + ": called");
+ + ": called");
// Connect to the broker.
connection = TestClient.createConnection(DEFAULT_CONNECTION_PROPS_RESOURCE, brokerUrl, virtualHost);
@@ -233,7 +231,7 @@ public class Coordinator extends TKTestRunner // Record the current test class, so that the test results can be output to a file incorporating this name.
this.currentTestClassName = testClassName;
- result = super.start(new String[] { testClassName });
+ result = super.start(new String[]{testClassName});
}
// At this point in time, all tests have completed. Broadcast the shutdown message.
@@ -257,7 +255,7 @@ public class Coordinator extends TKTestRunner public static Set<TestClientDetails> extractEnlists(Collection<Message> enlists) throws JMSException
{
log.debug("public static Set<TestClientDetails> extractEnlists(Collection<Message> enlists = " + enlists
- + "): called");
+ + "): called");
Set<TestClientDetails> enlistedClients = new HashSet<TestClientDetails>();
@@ -315,9 +313,9 @@ public class Coordinator extends TKTestRunner targetTest = new WrappedSuiteTestDecorator(suite);
log.debug("Wrapped with a WrappedSuiteTestDecorator.");
}
-
// Wrap the tests in an inviting test decorator, to perform the invite/test cycle.
- targetTest = new InvitingTestDecorator(targetTest, enlistedClients, conversationFactory, connection);
+
+ targetTest = newTestDecorator(targetTest, enlistedClients, conversationFactory, connection);
TestSuite suite = new TestSuite();
suite.addTest(targetTest);
@@ -328,6 +326,11 @@ public class Coordinator extends TKTestRunner return super.doRun(suite, wait);
}
+ protected WrappedSuiteTestDecorator newTestDecorator(WrappedSuiteTestDecorator targetTest, Set<TestClientDetails> enlistedClients, ConversationFactory conversationFactory, Connection connection)
+ {
+ return new InvitingTestDecorator(targetTest, enlistedClients, conversationFactory, connection);
+ }
+
/**
* Creates the TestResult object to be used for test runs.
*
@@ -340,10 +343,10 @@ public class Coordinator extends TKTestRunner TKTestResult result = new TKTestResult(fPrinter.getWriter(), delay, verbose, testCaseName);
// Check if a directory to output reports to has been specified and attach test listeners if so.
- if (reportDir != null)
+ if (_reportDir != null)
{
// Create the report directory if it does not already exist.
- File reportDirFile = new File(reportDir);
+ File reportDirFile = new File(_reportDir);
if (!reportDirFile.exists())
{
@@ -382,4 +385,9 @@ public class Coordinator extends TKTestRunner return result;
}
+
+ public void setReportDir(String reportDir)
+ {
+ _reportDir = reportDir;
+ }
}
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/InvitingTestDecorator.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/InvitingTestDecorator.java index 858ed1a589..8695f7f66f 100644 --- a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/InvitingTestDecorator.java +++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/InvitingTestDecorator.java @@ -21,7 +21,6 @@ package org.apache.qpid.interop.coordinator;
import java.util.*;
-import java.util.concurrent.LinkedBlockingQueue;
import javax.jms.Connection;
import javax.jms.Destination;
@@ -30,7 +29,6 @@ import javax.jms.Message; import junit.framework.Test;
import junit.framework.TestResult;
-import junit.framework.TestSuite;
import org.apache.log4j.Logger;
@@ -107,7 +105,7 @@ public class InvitingTestDecorator extends WrappedSuiteTestDecorator CoordinatingTestCase coordTest = (CoordinatingTestCase) test;
// Broadcast the invitation to find out what clients are available to test.
- Set<TestClientDetails> enlists = null;
+ Set<TestClientDetails> enlists;
try
{
Message invite = conversationFactory.getSession().createMessage();
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/ListeningCoordinatorTest.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/ListeningCoordinatorTest.java new file mode 100644 index 0000000000..1b4461f8c2 --- /dev/null +++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/ListeningCoordinatorTest.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.interop.coordinator; + +import javax.jms.Message; + +public interface ListeningCoordinatorTest +{ + public void latejoin(Message message); +} diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/ListeningTestDecorator.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/ListeningTestDecorator.java new file mode 100644 index 0000000000..4312dfbcc6 --- /dev/null +++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/ListeningTestDecorator.java @@ -0,0 +1,200 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.interop.coordinator; + +import junit.framework.Test; +import junit.framework.TestResult; +import org.apache.log4j.Logger; +import org.apache.qpid.util.ConversationFactory; +import uk.co.thebadgerset.junit.extensions.WrappedSuiteTestDecorator; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageListener; +import java.util.Collection; +import java.util.Iterator; +import java.util.Set; + +/** + * <p><table id="crc"><caption>CRC Card</caption> <tr><th> Responsibilities <th> Collaborations <tr><td> Broadcast test + * invitations and collect enlists. <td> {@link ConversationFactory}. <tr><td> Output test failures for clients + * unwilling to run the test case. <td> {@link Coordinator} <tr><td> Execute coordinated test cases. <td> {@link + * CoordinatingTestCase} </table> + */ +public class ListeningTestDecorator extends WrappedSuiteTestDecorator implements MessageListener +{ + private static final Logger log = Logger.getLogger(ListeningTestDecorator.class); + + /** Holds the contact information for all test clients that are available and that may take part in the test. */ + Set<TestClientDetails> allClients; + + /** Holds the conversation helper for the control level conversation for coordinating the test through. */ + ConversationFactory conversationFactory; + + /** Holds the connection that the control conversation is held over. */ + Connection connection; + + /** Holds the underlying {@link CoordinatingTestCase}s that this decorator wraps. */ + WrappedSuiteTestDecorator testSuite; + + /** Hold the current running test case. */ + CoordinatingTestCase _currentTest = null; + + /** + * Creates a wrapped suite test decorator from another one. + * + * @param suite The test suite. + * @param availableClients The list of all clients that responded to the compulsory invite. + * @param controlConversation The conversation helper for the control level, test coordination conversation. + * @param controlConnection The connection that the coordination messages are sent over. + */ + public ListeningTestDecorator(WrappedSuiteTestDecorator suite, Set<TestClientDetails> availableClients, + ConversationFactory controlConversation, Connection controlConnection) + { + super(suite); + + log.debug("public InvitingTestDecorator(WrappedSuiteTestDecorator suite, Set<TestClientDetails> allClients = " + + availableClients + ", ConversationHelper controlConversation = " + controlConversation + "): called"); + + testSuite = suite; + allClients = availableClients; + conversationFactory = controlConversation; + connection = controlConnection; + } + + /** + * Broadcasts a test invitation and accetps enlisting from participating clients. The wrapped test case is then + * repeated for every combination of test clients (provided the wrapped test case extends {@link + * CoordinatingTestCase}. + * + * <p/>Any JMSExceptions during the invite/enlist conversation will be allowed to fall through as runtime + * exceptions, resulting in the non-completion of the test run. + * + * @param testResult The the results object to monitor the test results with. + * + * @todo Better error recovery for failure of the invite/enlist conversation could be added. + */ + public void run(TestResult testResult) + { + log.debug("public void run(TestResult testResult): called"); + + Collection<Test> tests = testSuite.getAllUnderlyingTests(); + + for (Test test : tests) + { + CoordinatingTestCase coordTest = (CoordinatingTestCase) test; + + Set<TestClientDetails> enlists = signupClients(coordTest); + + if (enlists.size() == 0) + { + throw new RuntimeException("No clients to test with"); + } + + Iterator<TestClientDetails> clients = enlists.iterator(); + coordTest.setSender(clients.next()); + + while (clients.hasNext()) + { + // Set the sending and receiving client details on the test case. + coordTest.setReceiver(clients.next()); + } + + // Pass down the connection to hold the coordination conversation over. + coordTest.setConversationFactory(conversationFactory); + + + if (coordTest instanceof ListeningCoordinatorTest) + { + _currentTest = coordTest; + } + // Execute the test case. + coordTest.run(testResult); + + _currentTest = null; + } + } + + private Set<TestClientDetails> signupClients(CoordinatingTestCase coordTest) + { + // Broadcast the invitation to find out what clients are available to test. + Set<TestClientDetails> enlists; + try + { + Message invite = conversationFactory.getSession().createMessage(); + Destination controlTopic = conversationFactory.getSession().createTopic("iop.control"); + ConversationFactory.Conversation conversation = conversationFactory.startConversation(); + + invite.setStringProperty("CONTROL_TYPE", "INVITE"); + invite.setStringProperty("TEST_NAME", coordTest.getTestCaseNameForTestMethod(coordTest.getName())); + + conversation.send(controlTopic, invite); + + // Wait for a short time, to give test clients an opportunity to reply to the invitation. + Collection<Message> replies = conversation.receiveAll(allClients.size(), 5000); + + log.debug("Received " + replies.size() + " enlist replies"); + + enlists = Coordinator.extractEnlists(replies); + + //Create topic to listen on for latejoiners + Destination listenTopic = conversationFactory.getSession().createTopic("iop.control.test." + coordTest.getTestCaseNameForTestMethod(coordTest.getName())); + + //Listen for joiners + conversationFactory.getSession().createConsumer(listenTopic).setMessageListener(this); + log.debug("Created consumer on :" + listenTopic); + } + catch (JMSException e) + { + throw new RuntimeException("There was a JMSException during the invite/enlist conversation.", e); + } + + return enlists; + } + + /** + * Prints a string summarizing this test decorator, mainly for debugging purposes. + * + * @return String representation for debugging purposes. + */ + public String toString() + { + return "ListeningTestDecorator: [ testSuite = " + testSuite + " ]"; + } + + + public void onMessage(Message message) + { + try + { + if (message.getStringProperty("CONTROL_TYPE").equals("LATEJOIN")) + { + ((ListeningCoordinatorTest) _currentTest).latejoin(message); + } + } + catch (JMSException e) + { + log.debug("Unable to process message:" + message); + } + } +} diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java index 0b9c72e1b6..6cca23446f 100644 --- a/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java +++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java @@ -75,16 +75,17 @@ public class TestClient implements MessageListener /** Holds all the test cases loaded from the classpath. */
Map<String, InteropClientTestCase> testCases = new HashMap<String, InteropClientTestCase>();
- InteropClientTestCase currentTestCase;
+ protected InteropClientTestCase currentTestCase;
- private MessageProducer producer;
- private Session session;
+ protected Connection _connection;
+ protected MessageProducer producer;
+ protected Session session;
- private String clientName = CLIENT_NAME;
+ protected String clientName = CLIENT_NAME;
/**
- * Creates a new interop test client, listenting to the specified broker and virtual host, with the specified
- * client identifying name.
+ * Creates a new interop test client, listenting to the specified broker and virtual host, with the specified client
+ * identifying name.
*
* @param brokerUrl The url of the broker to connect to.
* @param virtualHost The virtual host to conect to.
@@ -93,7 +94,7 @@ public class TestClient implements MessageListener public TestClient(String brokerUrl, String virtualHost, String clientName)
{
log.debug("public TestClient(String brokerUrl = " + brokerUrl + ", String virtualHost = " + virtualHost
- + ", String clientName = " + clientName + "): called");
+ + ", String clientName = " + clientName + "): called");
// Retain the connection parameters.
this.brokerUrl = brokerUrl;
@@ -117,13 +118,13 @@ public class TestClient implements MessageListener {
// Use the command line parser to evaluate the command line.
CommandLineParser commandLine =
- new CommandLineParser(
- new String[][]
- {
- { "b", "The broker URL.", "broker", "false" },
- { "h", "The virtual host to use.", "virtual host", "false" },
- { "n", "The test client name.", "name", "false" }
- });
+ new CommandLineParser(
+ new String[][]
+ {
+ {"b", "The broker URL.", "broker", "false"},
+ {"h", "The virtual host to use.", "virtual host", "false"},
+ {"n", "The test client name.", "name", "false"}
+ });
// Capture the command line arguments or display errors and correct usage and then exit.
Properties options = null;
@@ -151,9 +152,17 @@ public class TestClient implements MessageListener // Create a test client and start it running.
TestClient client = new TestClient(brokerUrl, virtualHost, (clientName == null) ? CLIENT_NAME : clientName);
+ // Use a class path scanner to find all the interop test case implementations.
+ Collection<Class<? extends InteropClientTestCase>> testCaseClasses =
+ new ArrayList<Class<? extends InteropClientTestCase>>();
+ // ClasspathScanner.getMatches(InteropClientTestCase.class, "^TestCase.*", true);
+ // Hard code the test classes till the classpath scanner is fixed.
+ Collections.addAll(testCaseClasses,
+ new Class[]{TestCase1DummyRun.class, TestCase2BasicP2P.class, TestClient.class});
+
try
{
- client.start();
+ client.start(testCaseClasses);
}
catch (Exception e)
{
@@ -165,20 +174,12 @@ public class TestClient implements MessageListener /**
* Starts the interop test client running. This causes it to start listening for incoming test invites.
*
- * @throws JMSException Any underlying JMSExceptions are allowed to fall through.
+ * @throws JMSException Any underlying JMSExceptions are allowed to fall through. @param testCaseClasses
*/
- private void start() throws JMSException
+ protected void start(Collection<Class<? extends InteropClientTestCase>> testCaseClasses) throws JMSException
{
log.debug("private void start(): called");
- // Use a class path scanner to find all the interop test case implementations.
- Collection<Class<? extends InteropClientTestCase>> testCaseClasses =
- new ArrayList<Class<? extends InteropClientTestCase>>();
- // ClasspathScanner.getMatches(InteropClientTestCase.class, "^TestCase.*", true);
- // Hard code the test classes till the classpath scanner is fixed.
- Collections.addAll(testCaseClasses,
- new Class[] { TestCase1DummyRun.class, TestCase2BasicP2P.class, TestCase3BasicPubSub.class });
-
// Create all the test case implementations and index them by the test names.
for (Class<? extends InteropClientTestCase> nextClass : testCaseClasses)
{
@@ -200,9 +201,9 @@ public class TestClient implements MessageListener }
// Open a connection to communicate with the coordinator on.
- Connection connection = createConnection(DEFAULT_CONNECTION_PROPS_RESOURCE, brokerUrl, virtualHost);
+ _connection = createConnection(DEFAULT_CONNECTION_PROPS_RESOURCE, brokerUrl, virtualHost);
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Set this up to listen for control messages.
MessageConsumer consumer = session.createConsumer(session.createTopic("iop.control." + clientName));
@@ -215,7 +216,7 @@ public class TestClient implements MessageListener producer = session.createProducer(null);
// Start listening for incoming control messages.
- connection.start();
+ _connection.start();
}
/**
@@ -232,22 +233,25 @@ public class TestClient implements MessageListener * @param virtualHost The virtual host to connectio to, <tt>null</tt> to use the default.
*
* @return A JMS conneciton.
+ *
+ * @todo Make username/password configurable. Allow multiple urls for fail over. Once it feels right, move it to a
+ * Utils library class.
*/
public static Connection createConnection(String connectionPropsResource, String brokerUrl, String virtualHost)
{
log.debug("public static Connection createConnection(String connectionPropsResource = " + connectionPropsResource
- + ", String brokerUrl = " + brokerUrl + ", String virtualHost = " + virtualHost + "): called");
+ + ", String brokerUrl = " + brokerUrl + ", String virtualHost = " + virtualHost + "): called");
try
{
Properties connectionProps =
- PropertiesUtils.getProperties(TestClient.class.getClassLoader().getResourceAsStream(
- connectionPropsResource));
+ PropertiesUtils.getProperties(TestClient.class.getClassLoader().getResourceAsStream(
+ connectionPropsResource));
if (brokerUrl != null)
{
String connectionString =
- "amqp://guest:guest/" + ((virtualHost != null) ? virtualHost : "") + "?brokerlist='" + brokerUrl + "'";
+ "amqp://guest:guest/" + ((virtualHost != null) ? virtualHost : "") + "?brokerlist='" + brokerUrl + "'";
connectionProps.setProperty(CONNECTION_PROPERTY, connectionString);
}
@@ -286,21 +290,21 @@ public class TestClient implements MessageListener String controlType = message.getStringProperty("CONTROL_TYPE");
String testName = message.getStringProperty("TEST_NAME");
+ log.info("onMessage(Message message = " + message + "): for '" + controlType + "' to '" + testName + "'");
+
// Check if the message is a test invite.
if ("INVITE".equals(controlType))
{
- String testCaseName = message.getStringProperty("TEST_NAME");
-
// Flag used to indicate that an enlist should be sent. Only enlist to compulsory invites or invites
// for which test cases exist.
boolean enlist = false;
- if (testCaseName != null)
+ if (testName != null)
{
- log.debug("Got an invite to test: " + testCaseName);
+ log.debug("Got an invite to test: " + testName);
// Check if the requested test case is available.
- InteropClientTestCase testCase = testCases.get(testCaseName);
+ InteropClientTestCase testCase = testCases.get(testName);
if (testCase != null)
{
@@ -308,6 +312,10 @@ public class TestClient implements MessageListener currentTestCase = testCase;
enlist = true;
}
+ else
+ {
+ log.warn("'" + testName + "' not part of this clients tests.");
+ }
}
else
{
@@ -325,6 +333,8 @@ public class TestClient implements MessageListener enlistMessage.setStringProperty("CLIENT_PRIVATE_CONTROL_KEY", "iop.control." + clientName);
enlistMessage.setJMSCorrelationID(message.getJMSCorrelationID());
+ log.info("Sending Message '" + enlistMessage + "'. to " + message.getJMSReplyTo());
+
producer.send(message.getJMSReplyTo(), enlistMessage);
}
}
@@ -369,9 +379,10 @@ public class TestClient implements MessageListener }
else if ("TERMINATE".equals(controlType))
{
- System.out.println("Received termination instruction from coordinator.");
+ log.info("Received termination instruction from coordinator.");
// Is a cleaner shutdown needed?
+ _connection.close();
System.exit(0);
}
else
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase3BasicPubSub.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase3BasicPubSub.java index 2773cad3f3..ec8c72afa9 100644 --- a/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase3BasicPubSub.java +++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase3BasicPubSub.java @@ -5,7 +5,6 @@ import javax.jms.*; import org.apache.log4j.Logger;
import org.apache.qpid.interop.testclient.InteropClientTestCase;
-import org.apache.qpid.interop.testclient.TestClient;
/**
* Implements test case 3, basic pub/sub. Sends/received a specified number of messages to a specified route on the
@@ -120,8 +119,8 @@ public class TestCase3BasicPubSub implements InteropClientTestCase session = new Session[1];
connection[0] =
- TestClient.createConnection(TestClient.DEFAULT_CONNECTION_PROPS_RESOURCE, TestClient.brokerUrl,
- TestClient.virtualHost);
+ org.apache.qpid.interop.testclient.TestClient.createConnection(org.apache.qpid.interop.testclient.TestClient.DEFAULT_CONNECTION_PROPS_RESOURCE, org.apache.qpid.interop.testclient.TestClient.brokerUrl,
+ org.apache.qpid.interop.testclient.TestClient.virtualHost);
session[0] = connection[0].createSession(false, Session.AUTO_ACKNOWLEDGE);
// Extract and retain the test parameters.
@@ -140,8 +139,8 @@ public class TestCase3BasicPubSub implements InteropClientTestCase for (int i = 0; i < numReceivers; i++)
{
connection[i] =
- TestClient.createConnection(TestClient.DEFAULT_CONNECTION_PROPS_RESOURCE, TestClient.brokerUrl,
- TestClient.virtualHost);
+ org.apache.qpid.interop.testclient.TestClient.createConnection(org.apache.qpid.interop.testclient.TestClient.DEFAULT_CONNECTION_PROPS_RESOURCE, org.apache.qpid.interop.testclient.TestClient.brokerUrl,
+ org.apache.qpid.interop.testclient.TestClient.virtualHost);
session[i] = connection[i].createSession(false, Session.AUTO_ACKNOWLEDGE);
sendDestination = session[i].createTopic(sendKey);
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestClient.java b/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestClient.java new file mode 100644 index 0000000000..3a0d587d44 --- /dev/null +++ b/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestClient.java @@ -0,0 +1,567 @@ +package org.apache.qpid.sustained; + +import org.apache.log4j.Logger; +import org.apache.qpid.client.AMQNoConsumersException; +import org.apache.qpid.client.AMQNoRouteException; +import org.apache.qpid.interop.testclient.testcases.TestCase3BasicPubSub; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import java.util.HashMap; +import java.util.Map; + +/** + * Implements test case 3, basic pub/sub. Sends/received a specified number of messages to a specified route on the + * default topic exchange, using the specified number of receiver connections. Produces reports on the actual number of + * messages sent/received. + * + * <p><table id="crc"><caption>CRC Card</caption> <tr><th> Responsibilities <th> Collaborations <tr><td> Supply the name + * of the test case that this implements. <tr><td> Accept/Reject invites based on test parameters. <tr><td> Adapt to + * assigned roles. <tr><td> Send required number of test messages using pub/sub. <tr><td> Generate test reports. + * </table> + */ +public class SustainedTestClient extends TestCase3BasicPubSub implements ExceptionListener +{ + /** Used for debugging. */ + private static final Logger log = Logger.getLogger(SustainedTestClient.class); + + /** The role to be played by the test. */ + private Roles role; + + /** The number of test messages to send. */ +// private int numMessages; + + /** The number of receiver connection to use. */ + private int numReceivers; + + /** The routing key to send them to on the default direct exchange. */ + private Destination sendDestination; + + /** The routing key to send updates to on the default direct exchange. */ + private Destination sendUpdateDestination; + + + /** The connections to send/receive the test messages on. */ + private Connection[] connection; + + /** The sessions to send/receive the test messages on. */ + private Session[] session; + + /** The producer to send the test messages with. */ + MessageProducer producer; + + /** Adapter that adjusts the send rate based on the updates from clients. */ + SustainedRateAdapter _rateAdapter; + + /** */ + int updateInterval; + + private boolean _running = true; + + /** + * Should provide the name of the test case that this class implements. The exact names are defined in the interop + * testing spec. + * + * @return The name of the test case that this implements. + */ + public String getName() + { + log.debug("public String getName(): called"); + + return "Perf_SustainedPubSub"; + } + + /** + * Assigns the role to be played by this test case. The test parameters are fully specified in the assignment + * message. When this method return the test case will be ready to execute. + * + * @param role The role to be played; sender or receiver. + * @param assignRoleMessage The role assingment message, contains the full test parameters. + * + * @throws JMSException Any JMSException resulting from reading the message are allowed to fall through. + */ + public void assignRole(Roles role, Message assignRoleMessage) throws JMSException + { + log.debug("public void assignRole(Roles role = " + role + ", Message assignRoleMessage = " + assignRoleMessage + + "): called"); + + // Take note of the role to be played. + this.role = role; + + // Extract and retain the test parameters. + numReceivers = assignRoleMessage.getIntProperty("SUSTAINED_NUM_RECEIVERS"); + updateInterval = assignRoleMessage.getIntProperty("SUSTAINED_UPDATE_INTERVAL"); + String sendKey = assignRoleMessage.getStringProperty("SUSTAINED_KEY"); + String sendUpdateKey = assignRoleMessage.getStringProperty("SUSTAINED_UPDATE_KEY"); + int ackMode = assignRoleMessage.getIntProperty("ACKNOWLEDGE_MODE"); + + log.debug("numReceivers = " + numReceivers); + log.debug("updateInterval = " + updateInterval); + log.debug("ackMode = " + ackMode); + log.debug("sendKey = " + sendKey); + log.debug("sendUpdateKey = " + sendUpdateKey); + log.debug("role = " + role); + + switch (role) + { + // Check if the sender role is being assigned, and set up a single message producer if so. + case SENDER: + log.info("*********** Creating SENDER"); + // Create a new connection to pass the test messages on. + connection = new Connection[1]; + session = new Session[1]; + + connection[0] = + org.apache.qpid.interop.testclient.TestClient.createConnection(org.apache.qpid.interop.testclient.TestClient.DEFAULT_CONNECTION_PROPS_RESOURCE, org.apache.qpid.interop.testclient.TestClient.brokerUrl, + org.apache.qpid.interop.testclient.TestClient.virtualHost); + session[0] = connection[0].createSession(false, ackMode); + + // Extract and retain the test parameters. + sendDestination = session[0].createTopic(sendKey); + + connection[0].setExceptionListener(this); + + producer = session[0].createProducer(sendDestination); + + sendUpdateDestination = session[0].createTopic(sendUpdateKey); + MessageConsumer updateConsumer = session[0].createConsumer(sendUpdateDestination); + + _rateAdapter = new SustainedRateAdapter(this); + updateConsumer.setMessageListener(_rateAdapter); + + + break; + + // Otherwise the receiver role is being assigned, so set this up to listen for messages on the required number + // of receiver connections. + case RECEIVER: + log.info("*********** Creating RECEIVER"); + // Create the required number of receiver connections. + connection = new Connection[numReceivers]; + session = new Session[numReceivers]; + + for (int i = 0; i < numReceivers; i++) + { + connection[i] = + org.apache.qpid.interop.testclient.TestClient.createConnection(org.apache.qpid.interop.testclient.TestClient.DEFAULT_CONNECTION_PROPS_RESOURCE, + org.apache.qpid.interop.testclient.TestClient.brokerUrl, + org.apache.qpid.interop.testclient.TestClient.virtualHost); + session[i] = connection[i].createSession(false, ackMode); + + sendDestination = session[i].createTopic(sendKey); + + sendUpdateDestination = session[i].createTopic(sendUpdateKey); + + MessageConsumer consumer = session[i].createConsumer(sendDestination); + + consumer.setMessageListener(new SustainedListener(TestClient.CLIENT_NAME + "-" + i, updateInterval, session[i], sendUpdateDestination)); + } + + break; + } + + // Start all the connection dispatcher threads running. + for (int i = 0; i < connection.length; i++) + { + connection[i].start(); + } + } + + /** Performs the test case actions. */ + public void start() throws JMSException + { + log.debug("public void start(): called"); + + // Check that the sender role is being performed. + switch (role) + { + // Check if the sender role is being assigned, and set up a single message producer if so. + case SENDER: + Message testMessage = session[0].createTextMessage("test"); + +// for (int i = 0; i < numMessages; i++) + while (_running) + { + producer.send(testMessage); + + _rateAdapter.sentMessage(); + } + break; + case RECEIVER: + + } + } + + /** + * Gets a report on the actions performed by the test case in its assigned role. + * + * @param session The session to create the report message in. + * + * @return The report message. + * + * @throws JMSException Any JMSExceptions resulting from creating the report are allowed to fall through. + */ + public Message getReport(Session session) throws JMSException + { + log.debug("public Message getReport(Session session): called"); + + // Close the test connections. + for (int i = 0; i < connection.length; i++) + { + connection[i].close(); + } + + Message report = session.createMessage(); + report.setStringProperty("CONTROL_TYPE", "REPORT"); + + return report; + } + + public void onException(JMSException jmsException) + { + Exception linked = jmsException.getLinkedException(); + + if (linked != null) + { + if (linked instanceof AMQNoRouteException) + { + log.warn("No route ."); + } + else if (linked instanceof AMQNoConsumersException) + { + log.warn("No clients currently available for message:" + ((AMQNoConsumersException) linked).getUndeliveredMessage()); + } + else + { + + log.warn("LinkedException:" + linked); + } + + _rateAdapter.NO_CLIENTS = true; + } + else + { + log.warn("Exception:" + linked); + } + } + + class SustainedListener implements MessageListener + { + private int _received = 0; + private int _updateInterval = 0; + private Long _time; + MessageProducer _updater; + Session _session; + String _client; + + + public SustainedListener(String clientname, int updateInterval, Session session, Destination sendDestination) throws JMSException + { + _updateInterval = updateInterval; + _client = clientname; + _session = session; + _updater = session.createProducer(sendDestination); + } + + public void setReportInterval(int reportInterval) + { + _updateInterval = reportInterval; + _received = 0; + } + + public void onMessage(Message message) + { + if (log.isDebugEnabled()) + { + log.debug("Message " + _received + "received in listener"); + } + + if (message instanceof TextMessage) + { + + try + { + if (((TextMessage) message).getText().equals("test")) + { + if (_received == 0) + { + _time = System.nanoTime(); + sendStatus(0, _received); + } + + _received++; + + if (_received % _updateInterval == 0) + { + Long currentTime = System.nanoTime(); + + try + { + sendStatus(currentTime - _time, _received); + _time = currentTime; + } + catch (JMSException e) + { + log.error("Unable to send update."); + } + } + + } + } + catch (JMSException e) + { + //ignore error + } + } + } + + private void sendStatus(long time, int received) throws JMSException + { + Message updateMessage = _session.createTextMessage("update"); + updateMessage.setStringProperty("CLIENT_ID", _client); + updateMessage.setStringProperty("CONTROL_TYPE", "UPDATE"); + updateMessage.setLongProperty("RECEIVED", received); + updateMessage.setLongProperty("DURATION", time); + + log.info("**** SENDING **** CLIENT_ID:" + _client + " RECEIVED:" + received + " DURATION:" + time); + + _updater.send(updateMessage); + } + + } + + class SustainedRateAdapter implements MessageListener + { + private SustainedTestClient _client; + private long _variance = 250; //no. messages to allow drifting + private volatile long _delay; //in nanos + private long _sent; + private Map<String, Long> _slowClients = new HashMap<String, Long>(); + private static final long PAUSE_SLEEP = 10; // 10 ms + private static final long NO_CLIENT_SLEEP = 1000; // 1s + private static final long MAX_MESSAGE_DRIFT = 1000; // no messages drifted from producer + private volatile boolean NO_CLIENTS = true; + private int _delayShifting; + private static final int REPORTS_WITHOUT_CHANGE = 10; + private static final double MAXIMUM_DELAY_SHIFT = .02; //2% + + SustainedRateAdapter(SustainedTestClient client) + { + _client = client; + } + + public void onMessage(Message message) + { + if (log.isDebugEnabled()) + { + log.debug("SustainedRateAdapter onMessage(Message message = " + message + "): called"); + } + + try + { + String controlType = message.getStringProperty("CONTROL_TYPE"); + + // Check if the message is a test invite. + if ("UPDATE".equals(controlType)) + { + NO_CLIENTS = false; + long duration = message.getLongProperty("DURATION"); + long received = message.getLongProperty("RECEIVED"); + String client = message.getStringProperty("CLIENT_ID"); + + log.info("**** SENDING **** CLIENT_ID:" + client + " RECEIVED:" + received + " DURATION:" + duration); + + + recordSlow(client, received); + + adjustDelay(client, received, duration); + } + } + catch (JMSException e) + { + // + } + } + + class Pair<X, Y> + { + X item1; + Y item2; + + Pair(X i1, Y i2) + { + item1 = i1; + item2 = i2; + } + + X getItem1() + { + return item1; + } + + Y getItem2() + { + return item2; + } + } + + Map<String, Pair<Long, Long>> delays = new HashMap<String, Pair<Long, Long>>(); + Long totalReceived = 0L; + Long totalDuration = 0L; + + private void adjustDelay(String client, long received, long duration) + { + Pair<Long, Long> current = delays.get(client); + + if (current == null) + { + delays.put(client, new Pair<Long, Long>(received, duration)); + } + else + { + //reduce totals + totalReceived -= current.getItem1(); + totalDuration -= current.getItem2(); + } + + totalReceived += received; + totalDuration += duration; + + long averageDuration = totalDuration / delays.size(); + + long diff = Math.abs(_delay - averageDuration); + + //if the averageDuration differs from the current by more than the specified variane then adjust delay. + if (diff > _variance) + { + if (averageDuration > _delay) + { + // we can go faster + _delay -= diff; + if (_delay < 0) + { + _delay = 0; + } + } + else + { + // we need to slow down + _delay += diff; + } + delayChanged(); + } + else + { + delayStable(); + } + + } + + private void delayChanged() + { + _delayShifting = REPORTS_WITHOUT_CHANGE; + } + + private void delayStable() + { + _delayShifting--; + + if (_delayShifting < 0) + { + _delayShifting = 0; + log.info("Delay stabilised:" + _delay); + } + } + + // Record Slow clients + private void recordSlow(String client, long received) + { + if (received < (_sent - _variance)) + { + _slowClients.put(client, received); + } + else + { + _slowClients.remove(client); + } + } + + public void sentMessage() + { + if (_sent % updateInterval == 0) + { + + // Cause test to pause when we have slow + if (!_slowClients.isEmpty() || NO_CLIENTS) + { + log.info("Pausing for slow clients"); + + //_delay <<= 1; + + while (!_slowClients.isEmpty()) + { + sleep(PAUSE_SLEEP); + } + + if (NO_CLIENTS) + { + sleep(NO_CLIENT_SLEEP); + } + + log.debug("Continuing"); + return; + } + else + { + log.info("Delay:" + _delay); + } + } + + _sent++; + + if (_delay > 0) + { + // less than 10ms sleep doesn't work. + // _delay is in nano seconds + if (_delay < 1000000) + { + sleep(0, (int) _delay); + } + else + { + if (_delay < 30000000000L) + { + sleep(_delay / 1000000, (int) (_delay % 1000000)); + } + } + } + } + + private void sleep(long sleep) + { + sleep(sleep, 0); + } + + private void sleep(long milli, int nano) + { + try + { + log.debug("Sleep:" + milli + ":" + nano); + Thread.sleep(milli, nano); + } + catch (InterruptedException e) + { + // + } + } + } + +} diff --git a/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCoordinator.java b/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCoordinator.java new file mode 100644 index 0000000000..4081d87192 --- /dev/null +++ b/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCoordinator.java @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.sustained; + +import org.apache.log4j.Logger; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.interop.coordinator.ListeningCoordinatorTest; +import org.apache.qpid.interop.coordinator.TestClientDetails; +import org.apache.qpid.interop.coordinator.testcases.CoordinatingTestCase3BasicPubSub; +import org.apache.qpid.util.ConversationFactory; + +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.Session; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +public class SustainedTestCoordinator extends CoordinatingTestCase3BasicPubSub implements ListeningCoordinatorTest +{ + /** Used for debugging. */ + private static final Logger log = Logger.getLogger(SustainedTestCoordinator.class); + private List<TestClientDetails> _receivers; + private static final String SUSTAINED_KEY = "Perf_SustainedPubSub"; + Map<String, Object> _testProperties; + + /** + * Creates a new coordinating test case with the specified name. + * + * @param name The test case name. + */ + public SustainedTestCoordinator(String name) + { + super(name); + _receivers = new LinkedList(); + } + + /** + * Adds a receiver to this test. + * + * @param receiver The contact details of the sending client in the test. + */ + public void setReceiver(TestClientDetails receiver) + { + _receivers.add(receiver); + } + + + /** + * Performs the a single test run + * + * @throws Exception if there was a problem running the test. + */ + public void testBasicPubSub() throws Exception + { + log.debug("public void testSinglePubSubCycle(): called"); + + Map<String, Object> testConfig = new HashMap<String, Object>(); + testConfig.put("TEST_NAME", "Perf_SustainedPubSub"); + testConfig.put("SUSTAINED_KEY", SUSTAINED_KEY); + //testConfig.put("SUSTAINED_MSG_RATE", 10); + testConfig.put("SUSTAINED_NUM_RECEIVERS", 2); + testConfig.put("SUSTAINED_UPDATE_INTERVAL", 25); + testConfig.put("SUSTAINED_UPDATE_KEY", SUSTAINED_KEY + ".UPDATE"); + testConfig.put("ACKNOWLEDGE_MODE", AMQSession.NO_ACKNOWLEDGE); + + sequenceTest(testConfig); + } + + /** + * Holds a test coordinating conversation with the test clients. This is the basic implementation of the inner loop + * of Use Case 5. It consists of assigning the test roles, begining the test and gathering the test reports from the + * participants. + * + * @param testProperties The test case definition. + * + * @return The test results from the senders and receivers. + * + * @throws javax.jms.JMSException All underlying JMSExceptions are allowed to fall through. + */ + protected Message[] sequenceTest(Map<String, Object> testProperties) throws JMSException + { + log.debug("protected Message[] sequenceTest(Object... testProperties = " + testProperties + "): called"); + + Session session = conversationFactory.getSession(); + Destination senderControlTopic = session.createTopic(sender.privateControlKey); + + ConversationFactory.Conversation senderConversation = conversationFactory.startConversation(); + + // Assign the sender role to the sending test client. + Message assignSender = conversationFactory.getSession().createMessage(); + setPropertiesOnMessage(assignSender, testProperties); + assignSender.setStringProperty("CONTROL_TYPE", "ASSIGN_ROLE"); + assignSender.setStringProperty("ROLE", "SENDER"); + + senderConversation.send(senderControlTopic, assignSender); + + //Assign and wait for the receiver ckuebts to be ready. + _testProperties = testProperties; + + // Wait for the senders to confirm their roles. + senderConversation.receive(); + + assignReceivers(); + + // Start the test. + Message start = session.createMessage(); + start.setStringProperty("CONTROL_TYPE", "START"); + + senderConversation.send(senderControlTopic, start); + + // Wait for the test sender to return its report. + Message senderReport = senderConversation.receive(); + + try + { + Thread.sleep(500); + } + catch (InterruptedException e) + { + } + + // Ask the receiver for its report. + Message statusRequest = session.createMessage(); + statusRequest.setStringProperty("CONTROL_TYPE", "STATUS_REQUEST"); + + + return new Message[]{senderReport}; + } + + private void assignReceivers() + { + for (TestClientDetails receiver : _receivers) + { + registerReceiver(receiver); + } + } + + private void registerReceiver(TestClientDetails receiver) + { + log.info("registerReceiver called for receiver:" + receiver); + try + { + Session session = conversationFactory.getSession(); + Destination receiverControlTopic = session.createTopic(receiver.privateControlKey); + ConversationFactory.Conversation receiverConversation = conversationFactory.startConversation(); + // Assign the receiver role the receiving client. + Message assignReceiver = session.createMessage(); + setPropertiesOnMessage(assignReceiver, _testProperties); + assignReceiver.setStringProperty("CONTROL_TYPE", "ASSIGN_ROLE"); + assignReceiver.setStringProperty("ROLE", "RECEIVER"); + + receiverConversation.send(receiverControlTopic, assignReceiver); + + //Don't wait for receiver to be ready.... we can't this is being done in + // the dispatcher thread, and most likely the acceptance message we + // want is sitting in the Dispatcher._queue waiting its turn for being + // dispatched so if we block here we won't can't get the message. + // So assume consumer is ready for action. + //receiverConversation.receive(); + } + catch (JMSException e) + { + log.warn("Unable to assign receiver:" + receiver + ". Due to:" + e.getMessage()); + } + } + + public void latejoin(Message message) + { + try + { + + TestClientDetails clientDetails = new TestClientDetails(); + clientDetails.clientName = message.getStringProperty("CLIENT_NAME"); + clientDetails.privateControlKey = message.getStringProperty("CLIENT_PRIVATE_CONTROL_KEY"); + + + registerReceiver(clientDetails); + } + catch (JMSException e) + { + //swallow + } + } + + /** + * Should provide a translation from the junit method name of a test to its test case name as defined in the interop + * testing specification. For example the method "testP2P" might map onto the interop test case name + * "TC2_BasicP2P". + * + * @param methodName The name of the JUnit test method. + * + * @return The name of the corresponding interop test case. + */ + public String getTestCaseNameForTestMethod(String methodName) + { + return "Perf_SustainedPubSub"; + } +} diff --git a/java/integrationtests/src/main/java/org/apache/qpid/sustained/TestClient.java b/java/integrationtests/src/main/java/org/apache/qpid/sustained/TestClient.java new file mode 100644 index 0000000000..44fc090410 --- /dev/null +++ b/java/integrationtests/src/main/java/org/apache/qpid/sustained/TestClient.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.sustained; + +import org.apache.log4j.Logger; +import org.apache.qpid.interop.testclient.InteropClientTestCase; +import org.apache.qpid.util.CommandLineParser; + +import javax.jms.JMSException; +import javax.jms.Message; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Properties; + +public class TestClient extends org.apache.qpid.interop.testclient.TestClient +{ + private static Logger log = Logger.getLogger(TestClient.class); + + /** + * Creates a new interop test client, listenting to the specified broker and virtual host, with the specified client + * identifying name. + * + * @param brokerUrl The url of the broker to connect to. + * @param virtualHost The virtual host to conect to. + * @param clientName The client name to use. + */ + public TestClient(String brokerUrl, String virtualHost, String clientName) + { + super(brokerUrl, virtualHost, clientName); + } + + /** + * The entry point for the interop test coordinator. This client accepts the following command line arguments: + * + * <p/><table> <tr><td> -b <td> The broker URL. <td> Optional. <tr><td> -h <td> The virtual + * host. <td> Optional. <tr><td> -n <td> The test client name. <td> Optional. <tr><td> name=value <td> + * Trailing argument define name/value pairs. Added to system properties. <td> Optional. </table> + * + * @param args The command line arguments. + */ + public static void main(String[] args) + { + // Use the command line parser to evaluate the command line. + CommandLineParser commandLine = + new CommandLineParser( + new String[][] + { + {"b", "The broker URL.", "broker", "false"}, + {"h", "The virtual host to use.", "virtual host", "false"}, + {"n", "The test client name.", "name", "false"}, + {"j", "Join this test client to running test.", "join", ""} + }); + + // Capture the command line arguments or display errors and correct usage and then exit. + Properties options = null; + + try + { + options = commandLine.parseCommandLine(args); + } + catch (IllegalArgumentException e) + { + System.out.println(commandLine.getErrors()); + System.out.println(commandLine.getUsage()); + System.exit(1); + } + + // Extract the command line options. + String brokerUrl = options.getProperty("b"); + String virtualHost = options.getProperty("h"); + String clientName = options.getProperty("n"); + String join = options.getProperty("j"); + + // Add all the trailing command line options (name=value pairs) to system properties. Tests may pick up + // overridden values from there. + commandLine.addCommandLineToSysProperties(); + + // Create a test client and start it running. + TestClient client = new TestClient(brokerUrl, virtualHost, (clientName == null) ? CLIENT_NAME : clientName); + + // Use a class path scanner to find all the interop test case implementations. + Collection<Class<? extends InteropClientTestCase>> testCaseClasses = + new ArrayList<Class<? extends InteropClientTestCase>>(); + // ClasspathScanner.getMatches(InteropClientTestCase.class, "^TestCase.*", true); + // Hard code the test classes till the classpath scanner is fixed. + Collections.addAll(testCaseClasses, + SustainedTestClient.class); + + + try + { + client.start(testCaseClasses, join); + } + catch (Exception e) + { + log.error("The test client was unable to start.", e); + System.exit(1); + } + } + + protected void start(Collection<Class<? extends InteropClientTestCase>> testCaseClasses, String join) throws JMSException, ClassNotFoundException + { + super.start(testCaseClasses); + log.debug("private void start(): called"); + + if (join != null && !join.equals("")) + { + Message latejoin = session.createMessage(); + + try + { + Object test = Class.forName(join).newInstance(); + if (test instanceof InteropClientTestCase) + { + currentTestCase = (InteropClientTestCase) test; + } + else + { + throw new RuntimeException("Requested to join class '" + join + "' but this is not a InteropClientTestCase."); + } + + latejoin.setStringProperty("CONTROL_TYPE", "LATEJOIN"); + latejoin.setStringProperty("CLIENT_NAME", clientName); + latejoin.setStringProperty("CLIENT_PRIVATE_CONTROL_KEY", "iop.control." + clientName); + producer.send(session.createTopic("iop.control.test." + currentTestCase.getName()), latejoin); + } + catch (InstantiationException e) + { + log.warn("Unable to request latejoining of test:" + currentTestCase); + } + catch (IllegalAccessException e) + { + log.warn("Unable to request latejoining of test:" + currentTestCase); + } + } + } + +} diff --git a/java/integrationtests/src/main/java/org/apache/qpid/sustained/TestCoordinator.java b/java/integrationtests/src/main/java/org/apache/qpid/sustained/TestCoordinator.java new file mode 100644 index 0000000000..7e12fe39fb --- /dev/null +++ b/java/integrationtests/src/main/java/org/apache/qpid/sustained/TestCoordinator.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.sustained; + +import org.apache.qpid.interop.coordinator.Coordinator; +import org.apache.qpid.interop.coordinator.ListeningTestDecorator; +import org.apache.qpid.interop.coordinator.TestClientDetails; +import org.apache.qpid.util.CommandLineParser; +import org.apache.qpid.util.ConversationFactory; +import org.apache.log4j.Logger; + +import java.util.Properties; +import java.util.Set; + +import junit.framework.TestResult; +import uk.co.thebadgerset.junit.extensions.WrappedSuiteTestDecorator; + +import javax.jms.Connection; + +public class TestCoordinator extends Coordinator +{ + + private static final Logger log = Logger.getLogger(TestCoordinator.class); + + /** + * Creates an interop test coordinator on the specified broker and virtual host. + * + * @param brokerUrl The URL of the broker to connect to. + * @param virtualHost The virtual host to run all tests on. Optional, may be <tt>null</tt>. + */ + TestCoordinator(String brokerUrl, String virtualHost) + { + super(brokerUrl, virtualHost); + } + + protected WrappedSuiteTestDecorator newTestDecorator(WrappedSuiteTestDecorator targetTest, Set<TestClientDetails> enlistedClients, ConversationFactory conversationFactory, Connection connection) + { + return new ListeningTestDecorator(targetTest, enlistedClients, conversationFactory, connection); + } + + + /** + * The entry point for the interop test coordinator. This client accepts the following command line arguments: + * + * <p/><table> <tr><td> -b <td> The broker URL. <td> Mandatory. <tr><td> -h <td> The virtual host. + * <td> Optional. <tr><td> name=value <td> Trailing argument define name/value pairs. Added to system properties. + * <td> Optional. </table> + * + * @param args The command line arguments. + */ + public static void main(String[] args) + { + try + { + // Use the command line parser to evaluate the command line with standard handling behaviour (print errors + // and usage then exist if there are errors). + Properties options = + CommandLineParser.processCommandLine(args, + new CommandLineParser( + new String[][] + { + {"b", "The broker URL.", "broker", "false"}, + {"h", "The virtual host to use.", "virtual host", "false"}, + {"o", "The name of the directory to output test timings to.", "dir", "false"} + })); + + // Extract the command line options. + String brokerUrl = options.getProperty("b"); + String virtualHost = options.getProperty("h"); + String reportDir = options.getProperty("o"); + reportDir = (reportDir == null) ? "." : reportDir; + + + String[] testClassNames = {SustainedTestCoordinator.class.getName()}; + + // Create a coordinator and begin its test procedure. + Coordinator coordinator = new TestCoordinator(brokerUrl, virtualHost); + + coordinator.setReportDir(reportDir); + + TestResult testResult = coordinator.start(testClassNames); + + if (testResult.failureCount() > 0) + { + System.exit(FAILURE_EXIT); + } + else + { + System.exit(SUCCESS_EXIT); + } + } + catch (Exception e) + { + System.err.println(e.getMessage()); + log.error("Top level handler caught execption.", e); + System.exit(EXCEPTION_EXIT); + } + } +} |