summaryrefslogtreecommitdiff
path: root/java/integrationtests
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2007-07-31 22:34:12 +0000
committerRafael H. Schloming <rhs@apache.org>2007-07-31 22:34:12 +0000
commit902481c5caf3a72538586a68cb779ddb9aa60c58 (patch)
tree6efbbe527de4a95a34a0033d9e87b3d848a90bcd /java/integrationtests
parentc11f9a79eec63da7aa6e6dac248a689a9d461beb (diff)
downloadqpid-python-902481c5caf3a72538586a68cb779ddb9aa60c58.tar.gz
Rolled back revision 561365 and commented out some broken code in ClientSession.java. The trunk should now build.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@561578 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/integrationtests')
-rw-r--r--java/integrationtests/docs/RunningSustainedTests.txt14
-rw-r--r--java/integrationtests/pom.xml6
-rw-r--r--java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/CoordinatingTestCase.java263
-rw-r--r--java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/Coordinator.java388
-rw-r--r--java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/InvitingTestDecorator.java220
-rw-r--r--java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/ListeningCoordinatorTest.java28
-rw-r--r--java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/ListeningTestDecorator.java200
-rw-r--r--java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/OptOutTestCase.java65
-rw-r--r--java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/TestClientDetails.java87
-rw-r--r--java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/XMLTestListener.java402
-rw-r--r--java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/testcases/CoordinatingTestCase1DummyRun.java (renamed from java/integrationtests/src/main/java/org/apache/qpid/interop/testcases/InteropTestCase1DummyRun.java)35
-rw-r--r--java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/testcases/CoordinatingTestCase2BasicP2P.java (renamed from java/integrationtests/src/main/java/org/apache/qpid/interop/testcases/InteropTestCase2BasicP2P.java)42
-rw-r--r--java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/testcases/CoordinatingTestCase3BasicPubSub.java (renamed from java/integrationtests/src/main/java/org/apache/qpid/interop/testcases/InteropTestCase3BasicPubSub.java)34
-rw-r--r--java/integrationtests/src/main/java/org/apache/qpid/interop/old/Listener.java291
-rw-r--r--java/integrationtests/src/main/java/org/apache/qpid/interop/old/Publisher.java244
-rw-r--r--java/integrationtests/src/main/java/org/apache/qpid/interop/testcases/CircuitTestCase.java101
-rw-r--r--java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/InteropClientTestCase.java (renamed from java/integrationtests/src/main/java/org/apache/qpid/test/framework/distributedtesting/InteropClientTestCase.java)21
-rw-r--r--java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java (renamed from java/integrationtests/src/main/java/org/apache/qpid/test/framework/distributedtesting/TestClient.java)229
-rw-r--r--java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase1DummyRun.java (renamed from java/integrationtests/src/main/java/org/apache/qpid/interop/clienttestcases/TestCase1DummyRun.java)51
-rw-r--r--java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase2BasicP2P.java (renamed from java/integrationtests/src/main/java/org/apache/qpid/interop/clienttestcases/TestCase2BasicP2P.java)33
-rw-r--r--java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase3BasicPubSub.java (renamed from java/integrationtests/src/main/java/org/apache/qpid/interop/clienttestcases/TestCase3BasicPubSub.java)56
-rw-r--r--java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedClientTestCase.java905
-rw-r--r--java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCase.java126
-rw-r--r--java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestClient.java8
-rw-r--r--java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCoordinator.java222
-rw-r--r--java/integrationtests/src/main/java/org/apache/qpid/sustained/TestClient.java157
-rw-r--r--java/integrationtests/src/main/java/org/apache/qpid/sustained/TestCoordinator.java117
-rw-r--r--java/integrationtests/src/main/java/org/apache/qpid/util/ClasspathScanner.java234
-rw-r--r--java/integrationtests/src/main/java/org/apache/qpid/util/ConversationFactory.java479
-rw-r--r--java/integrationtests/src/resources/org/apache/qpid/interop/connection.properties20
30 files changed, 3696 insertions, 1382 deletions
diff --git a/java/integrationtests/docs/RunningSustainedTests.txt b/java/integrationtests/docs/RunningSustainedTests.txt
index db4405a32d..2b37f4c5a7 100644
--- a/java/integrationtests/docs/RunningSustainedTests.txt
+++ b/java/integrationtests/docs/RunningSustainedTests.txt
@@ -1,17 +1,15 @@
In addition to the integration tests the framework provided by this package also allows for
sustained tests to be run. Currently avaible tests:
-- org.apache.qpid.sustained.SustainedClientTestCase : Pub Sub test to determine steady state throughput.
+- org.apache.qpid.sustained.SustainedTestClient : Pub Sub test to determine steady state throughput.
Running Tests.
Run the tests as per the integration tests.
-- Start a broker
-- Start at least one test client [java org.apache.qpid.interop.TestClient], ensuring unique naming.
-
-- Start the test coordinator with the 'fanout' engine, on the sustained test case [java org.apache.qpid.test.framework.distributedtesting.Coordinator]
-
-- Additional Test clients can be started and joined into the running test: [java org.apache.qpid.interop.TestClient -j]
-
+ - Start a broker
+ - Start at least one Client [java org.apache.qpid.sustained.TestClient], ensuring unique naming
+ - Start Test Controller [java org.apache.qpid.sustained.TestCoordinator]
+ - Additional Test clients can be started:
+ [java org.apache.qpid.sustained.TestClient -j org.apache.qpid.sustained.SustainedTestClient]
diff --git a/java/integrationtests/pom.xml b/java/integrationtests/pom.xml
index 89fd5ede28..9ccd153f54 100644
--- a/java/integrationtests/pom.xml
+++ b/java/integrationtests/pom.xml
@@ -40,16 +40,12 @@
<dependencies>
+ <!-- These tests depend on the client API only. -->
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-client</artifactId>
</dependency>
- <dependency>
- <groupId>org.apache.qpid</groupId>
- <artifactId>qpid-systests</artifactId>
- </dependency>
-
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/CoordinatingTestCase.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/CoordinatingTestCase.java
new file mode 100644
index 0000000000..d2042be741
--- /dev/null
+++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/CoordinatingTestCase.java
@@ -0,0 +1,263 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.interop.coordinator;
+
+import junit.framework.TestCase;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.util.ConversationFactory;
+
+import javax.jms.*;
+
+import java.util.Map;
+
+/**
+ * A CoordinatingTestCase is a JUnit test case extension that knows how to coordinate test clients that take part in a
+ * test case as defined in the interop testing specification
+ * (http://cwiki.apache.org/confluence/display/qpid/Interop+Testing+Specification).
+ *
+ * <p/>The real logic of the test cases built on top of this, is embeded in the comparison of the sender and receiver
+ * reports. An example test method might look like:
+ *
+ * <p/><pre>
+ * public void testExample()
+ * {
+ * Properties testConfig = new Properties();
+ * testConfig.add("TEST_CASE", "example");
+ * ...
+ *
+ * Report[] reports = sequenceTest(testConfig);
+ *
+ * // Compare sender and receiver reports.
+ * if (report[0] ... report[1] ...)
+ * {
+ * Assert.fail("Sender and receiver reports did not match up.");
+ * }
+ * }
+ *
+ * </pre>
+ *
+ * <p><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Accept notification of test case participants. <td> {@link InvitingTestDecorator}
+ * <tr><td> Accpet JMS Connection to carry out the coordination over.
+ * <tr><td> Coordinate the test sequence amongst participants. <td> {@link ConversationFactory}
+ * <tr><td> Supply test properties
+ * </table>
+ */
+public abstract class CoordinatingTestCase extends TestCase
+{
+ /** Used for debugging. */
+ private static final Logger log = Logger.getLogger(CoordinatingTestCase.class);
+
+ /** Holds the contact details for the sending test client. */
+ protected TestClientDetails sender;
+
+ /** Holds the contact details for the receving test client. */
+ protected TestClientDetails receiver;
+
+ /** Holds the conversation factory over which to coordinate the test. */
+ protected ConversationFactory conversationFactory;
+
+ /**
+ * Creates a new coordinating test case with the specified name.
+ *
+ * @param name The test case name.
+ */
+ public CoordinatingTestCase(String name)
+ {
+ super(name);
+ }
+
+ /**
+ * Sets the sender test client to coordinate the test with.
+ *
+ * @param sender The contact details of the sending client in the test.
+ */
+ public void setSender(TestClientDetails sender)
+ {
+ log.debug("public void setSender(TestClientDetails sender = " + sender + "): called");
+
+ this.sender = sender;
+ }
+
+ /**
+ * Sets the receiving test client to coordinate the test with.
+ *
+ * @param receiver The contact details of the sending client in the test.
+ */
+ public void setReceiver(TestClientDetails receiver)
+ {
+ log.debug("public void setReceiver(TestClientDetails receiver = " + receiver + "): called");
+
+ this.receiver = receiver;
+ }
+
+ /**
+ * Supplies the sending test client.
+ *
+ * @return The sending test client.
+ */
+ public TestClientDetails getSender()
+ {
+ return sender;
+ }
+
+ /**
+ * Supplies the receiving test client.
+ *
+ * @return The receiving test client.
+ */
+ public TestClientDetails getReceiver()
+ {
+ return receiver;
+ }
+
+ /**
+ * Returns the name of the current test method of this test class, with the sending and receiving client names
+ * appended on to it, so that the resulting name unqiuely identifies the test and the clients that participated
+ * in it.
+ *
+ * @return The unique test and client name.
+ */
+ public String getName()
+ {
+ if ((sender == null) || (receiver == null))
+ {
+ return super.getName();
+ }
+ else
+ {
+ return super.getName() + "_sender_" + sender.clientName + "_receiver_" + receiver.clientName;
+ }
+ }
+
+ /**
+ * Should provide a translation from the junit method name of a test to its test case name as defined in the
+ * interop testing specification. For example the method "testP2P" might map onto the interop test case name
+ * "TC2_BasicP2P".
+ *
+ * @param methodName The name of the JUnit test method.
+ *
+ * @return The name of the corresponding interop test case.
+ */
+ public abstract String getTestCaseNameForTestMethod(String methodName);
+
+ /**
+ * Accepts the conversation factory over which to hold the test coordinating conversation.
+ *
+ * @param conversationFactory The conversation factory to coordinate the test over.
+ */
+ public void setConversationFactory(ConversationFactory conversationFactory)
+ {
+ this.conversationFactory = conversationFactory;
+ }
+
+ /**
+ * Holds a test coordinating conversation with the test clients. This is the basic implementation of the inner
+ * loop of Use Case 5. It consists of assigning the test roles, begining the test and gathering the test reports
+ * from the participants.
+ *
+ * @param testProperties The test case definition.
+ *
+ * @return The test results from the senders and receivers.
+ *
+ * @throws JMSException All underlying JMSExceptions are allowed to fall through.
+ */
+ protected Message[] sequenceTest(Map<String, Object> testProperties) throws JMSException
+ {
+ log.debug("protected Message[] sequenceTest(Object... testProperties = " + testProperties + "): called");
+
+ Session session = conversationFactory.getSession();
+ Destination senderControlTopic = session.createTopic(sender.privateControlKey);
+ Destination receiverControlTopic = session.createTopic(receiver.privateControlKey);
+
+ ConversationFactory.Conversation senderConversation = conversationFactory.startConversation();
+ ConversationFactory.Conversation receiverConversation = conversationFactory.startConversation();
+
+ // Assign the sender role to the sending test client.
+ Message assignSender = conversationFactory.getSession().createMessage();
+ setPropertiesOnMessage(assignSender, testProperties);
+ assignSender.setStringProperty("CONTROL_TYPE", "ASSIGN_ROLE");
+ assignSender.setStringProperty("ROLE", "SENDER");
+
+ senderConversation.send(senderControlTopic, assignSender);
+
+ // Assign the receiver role the receiving client.
+ Message assignReceiver = session.createMessage();
+ setPropertiesOnMessage(assignReceiver, testProperties);
+ assignReceiver.setStringProperty("CONTROL_TYPE", "ASSIGN_ROLE");
+ assignReceiver.setStringProperty("ROLE", "RECEIVER");
+
+ receiverConversation.send(receiverControlTopic, assignReceiver);
+
+ // Wait for the senders and receivers to confirm their roles.
+ senderConversation.receive();
+ receiverConversation.receive();
+
+ // Start the test.
+ Message start = session.createMessage();
+ start.setStringProperty("CONTROL_TYPE", "START");
+
+ senderConversation.send(senderControlTopic, start);
+
+ // Wait for the test sender to return its report.
+ Message senderReport = senderConversation.receive();
+
+ try
+ {
+ Thread.sleep(500);
+ }
+ catch (InterruptedException e)
+ { }
+
+ // Ask the receiver for its report.
+ Message statusRequest = session.createMessage();
+ statusRequest.setStringProperty("CONTROL_TYPE", "STATUS_REQUEST");
+
+ receiverConversation.send(receiverControlTopic, statusRequest);
+
+ // Wait for the receiver to send its report.
+ Message receiverReport = receiverConversation.receive();
+
+ return new Message[] { senderReport, receiverReport };
+ }
+
+ /**
+ * Sets properties of different types on a JMS Message.
+ *
+ * @param message The message to set properties on.
+ * @param properties The property name/value pairs to set.
+ *
+ * @throws JMSException All underlying JMSExceptions are allowed to fall through.
+ */
+ public void setPropertiesOnMessage(Message message, Map<String, Object> properties) throws JMSException
+ {
+ for (Map.Entry<String, Object> entry : properties.entrySet())
+ {
+ String name = entry.getKey();
+ Object value = entry.getValue();
+
+ message.setObjectProperty(name, value);
+ }
+ }
+}
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/Coordinator.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/Coordinator.java
new file mode 100644
index 0000000000..6eec20769a
--- /dev/null
+++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/Coordinator.java
@@ -0,0 +1,388 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.interop.coordinator;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.LinkedBlockingQueue;
+import javax.jms.*;
+import junit.framework.Test;
+import junit.framework.TestResult;
+import junit.framework.TestSuite;
+import org.apache.log4j.Logger;
+import org.apache.qpid.interop.coordinator.testcases.CoordinatingTestCase1DummyRun;
+import org.apache.qpid.interop.coordinator.testcases.CoordinatingTestCase2BasicP2P;
+import org.apache.qpid.interop.coordinator.testcases.CoordinatingTestCase3BasicPubSub;
+import org.apache.qpid.interop.testclient.TestClient;
+import org.apache.qpid.util.CommandLineParser;
+import org.apache.qpid.util.ConversationFactory;
+import org.apache.qpid.util.PrettyPrintingUtils;
+import uk.co.thebadgerset.junit.extensions.TKTestResult;
+import uk.co.thebadgerset.junit.extensions.TKTestRunner;
+import uk.co.thebadgerset.junit.extensions.WrappedSuiteTestDecorator;
+import uk.co.thebadgerset.junit.extensions.util.TestContextProperties;
+
+/**
+ * <p/>Implements the coordinator client described in the interop testing specification
+ * (http://cwiki.apache.org/confluence/display/qpid/Interop+Testing+Specification). This coordinator is built on
+ * top of the JUnit testing framework.
+ *
+ * <p><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Find out what test clients are available. <td> {@link ConversationFactory}
+ * <tr><td> Decorate available tests to run all available clients. <td> {@link InvitingTestDecorator}
+ * <tr><td> Attach XML test result logger.
+ * <tr><td> Terminate the interop testing framework.
+ * </table>
+ */
+public class Coordinator extends TKTestRunner
+{
+ private static final Logger log = Logger.getLogger(Coordinator.class);
+
+ public static final String DEFAULT_CONNECTION_PROPS_RESOURCE = "org/apache/qpid/interop/connection.properties";
+
+ /** Holds the URL of the broker to coordinate the tests on. */
+ protected String brokerUrl;
+
+ /** Holds the virtual host to coordinate the tests on. If <tt>null</tt>, then the default virtual host is used. */
+ protected String virtualHost;
+
+ /** Holds the list of all clients that enlisted, when the compulsory invite was issued. */
+ protected Set<TestClientDetails> enlistedClients = new HashSet<TestClientDetails>();
+
+ /** Holds the conversation helper for the control conversation. */
+ protected ConversationFactory conversationFactory;
+
+ /** Holds the connection that the coordinating messages are sent over. */
+ protected Connection connection;
+
+ /**
+ * Holds the name of the class of the test currently being run. Ideally passed into the {@link #createTestResult}
+ * method, but as the signature is already fixed for this, the current value gets pushed here as a member variable.
+ */
+ protected String currentTestClassName;
+
+ /** Holds the path of the directory to output test results too, if one is defined. */
+ protected static String _reportDir;
+
+ /**
+ * Creates an interop test coordinator on the specified broker and virtual host.
+ *
+ * @param brokerUrl The URL of the broker to connect to.
+ * @param virtualHost The virtual host to run all tests on. Optional, may be <tt>null</tt>.
+ */
+ public Coordinator(String brokerUrl, String virtualHost)
+ {
+ log.debug("Coordinator(String brokerUrl = " + brokerUrl + ", String virtualHost = " + virtualHost + "): called");
+
+ // Retain the connection parameters.
+ this.brokerUrl = brokerUrl;
+ this.virtualHost = virtualHost;
+ }
+
+ /**
+ * The entry point for the interop test coordinator. This client accepts the following command line arguments:
+ *
+ * <p/><table>
+ * <tr><td> -b <td> The broker URL. <td> Mandatory.
+ * <tr><td> -h <td> The virtual host. <td> Optional.
+ * <tr><td> name=value <td> Trailing argument define name/value pairs. Added to system properties. <td> Optional.
+ * </table>
+ *
+ * @param args The command line arguments.
+ */
+ public static void main(String[] args)
+ {
+ try
+ {
+ // Use the command line parser to evaluate the command line with standard handling behaviour (print errors
+ // and usage then exit if there are errors).
+ Properties options =
+ CommandLineParser.processCommandLine(args,
+ new CommandLineParser(
+ new String[][]
+ {
+ {"b", "The broker URL.", "broker", "false"},
+ {"h", "The virtual host to use.", "virtual host", "false"},
+ {"o", "The name of the directory to output test timings to.", "dir", "false"}
+ }));
+
+ // Extract the command line options.
+ String brokerUrl = options.getProperty("b");
+ String virtualHost = options.getProperty("h");
+ _reportDir = options.getProperty("o");
+ _reportDir = (_reportDir == null) ? "." : _reportDir;
+
+ // Scan for available test cases using a classpath scanner.
+ Collection<Class<? extends CoordinatingTestCase>> testCaseClasses =
+ new ArrayList<Class<? extends CoordinatingTestCase>>();
+ // ClasspathScanner.getMatches(CoordinatingTestCase.class, "^Test.*", true);
+ // Hard code the test classes till the classpath scanner is fixed.
+ Collections.addAll(testCaseClasses,
+ CoordinatingTestCase1DummyRun.class,
+ CoordinatingTestCase2BasicP2P.class,
+ CoordinatingTestCase3BasicPubSub.class);
+
+ // Check that some test classes were actually found.
+ if (testCaseClasses.isEmpty())
+ {
+ throw new RuntimeException(
+ "No test classes implementing CoordinatingTestCase were found on the class path.");
+ }
+
+ int i = 0;
+ String[] testClassNames = new String[testCaseClasses.size()];
+
+ for (Class testClass : testCaseClasses)
+ {
+ testClassNames[i++] = testClass.getName();
+ }
+
+ // Create a coordinator and begin its test procedure.
+ Coordinator coordinator = new Coordinator(brokerUrl, virtualHost);
+
+ boolean failure = false;
+
+ TestResult testResult = coordinator.start(testClassNames);
+
+ if (failure)
+ {
+ System.exit(FAILURE_EXIT);
+ }
+ else
+ {
+ System.exit(SUCCESS_EXIT);
+ }
+ }
+ catch (Exception e)
+ {
+ System.err.println(e.getMessage());
+ log.error("Top level handler caught execption.", e);
+ System.exit(EXCEPTION_EXIT);
+ }
+ }
+
+ /**
+ * Starts all of the test classes to be run by this coordinator running.
+ *
+ * @param testClassNames An array of all the coordinating test case implementations.
+ *
+ * @return A JUnit TestResult to run the tests with.
+ *
+ * @throws Exception Any underlying exceptions are allowed to fall through, and fail the test process.
+ */
+ public TestResult start(String[] testClassNames) throws Exception
+ {
+ log.debug("public TestResult start(String[] testClassNames = " + PrettyPrintingUtils.printArray(testClassNames)
+ + ": called");
+
+ // Connect to the broker.
+ connection = TestClient.createConnection(DEFAULT_CONNECTION_PROPS_RESOURCE, brokerUrl, virtualHost);
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Destination controlTopic = session.createTopic("iop.control");
+ Destination responseQueue = session.createQueue("coordinator");
+
+ conversationFactory = new ConversationFactory(connection, responseQueue, LinkedBlockingQueue.class);
+ ConversationFactory.Conversation conversation = conversationFactory.startConversation();
+
+ connection.start();
+
+ // Broadcast the compulsory invitation to find out what clients are available to test.
+ Message invite = session.createMessage();
+ invite.setStringProperty("CONTROL_TYPE", "INVITE");
+ invite.setJMSReplyTo(responseQueue);
+
+ conversation.send(controlTopic, invite);
+
+ // Wait for a short time, to give test clients an opportunity to reply to the invitation.
+ Collection<Message> enlists = conversation.receiveAll(0, 3000);
+
+ enlistedClients = extractEnlists(enlists);
+
+ // Run the test in the suite using JUnit.
+ TestResult result = null;
+
+ for (String testClassName : testClassNames)
+ {
+ // Record the current test class, so that the test results can be output to a file incorporating this name.
+ this.currentTestClassName = testClassName;
+
+ result = super.start(new String[]{testClassName});
+ }
+
+ // At this point in time, all tests have completed. Broadcast the shutdown message.
+ Message terminate = session.createMessage();
+ terminate.setStringProperty("CONTROL_TYPE", "TERMINATE");
+
+ conversation.send(controlTopic, terminate);
+
+ return result;
+ }
+
+ /**
+ * For a collection of enlist messages, this method pulls out of the client details for the enlisting clients.
+ *
+ * @param enlists The enlist messages.
+ *
+ * @return A set of enlisting clients, extracted from the enlist messages.
+ *
+ * @throws JMSException Any underlying JMSException is allowed to fall through.
+ */
+ public static Set<TestClientDetails> extractEnlists(Collection<Message> enlists) throws JMSException
+ {
+ log.debug("public static Set<TestClientDetails> extractEnlists(Collection<Message> enlists = " + enlists
+ + "): called");
+
+ Set<TestClientDetails> enlistedClients = new HashSet<TestClientDetails>();
+
+ // Retain the list of all available clients.
+ for (Message enlist : enlists)
+ {
+ TestClientDetails clientDetails = new TestClientDetails();
+ clientDetails.clientName = enlist.getStringProperty("CLIENT_NAME");
+ clientDetails.privateControlKey = enlist.getStringProperty("CLIENT_PRIVATE_CONTROL_KEY");
+
+ enlistedClients.add(clientDetails);
+ }
+
+ return enlistedClients;
+ }
+
+ /**
+ * Runs a test or suite of tests, using the super class implemenation. This method wraps the test to be run
+ * in any test decorators needed to add in the coordinators ability to invite test clients to participate in
+ * tests.
+ *
+ * @param test The test to run.
+ * @param wait Undocumented. Nothing in the JUnit javadocs to say what this is for.
+ *
+ * @return The results of the test run.
+ */
+ public TestResult doRun(Test test, boolean wait)
+ {
+ log.debug("public TestResult doRun(Test \"" + test + "\", boolean " + wait + "): called");
+
+ // Wrap all tests in the test suite with WrappedSuiteTestDecorators. This is quite ugly and a bit baffling,
+ // but the reason it is done is because the JUnit implementation of TestDecorator has some bugs in it.
+ WrappedSuiteTestDecorator targetTest = null;
+
+ if (test instanceof TestSuite)
+ {
+ log.debug("targetTest is a TestSuite");
+
+ TestSuite suite = (TestSuite) test;
+
+ int numTests = suite.countTestCases();
+ log.debug("There are " + numTests + " in the suite.");
+
+ for (int i = 0; i < numTests; i++)
+ {
+ Test nextTest = suite.testAt(i);
+ log.debug("suite.testAt(" + i + ") = " + nextTest);
+
+ if (nextTest instanceof CoordinatingTestCase)
+ {
+ log.debug("nextTest is a CoordinatingTestCase");
+ }
+ }
+
+ targetTest = new WrappedSuiteTestDecorator(suite);
+ log.debug("Wrapped with a WrappedSuiteTestDecorator.");
+ }
+ // Wrap the tests in an inviting test decorator, to perform the invite/test cycle.
+
+ targetTest = newTestDecorator(targetTest, enlistedClients, conversationFactory, connection);
+
+ TestSuite suite = new TestSuite();
+ suite.addTest(targetTest);
+
+ // Wrap the tests in a scaled test decorator to them them as a 'batch' in one thread.
+ // targetTest = new ScaledTestDecorator(targetTest, new int[] { 1 });
+
+ return super.doRun(suite, wait);
+ }
+
+ protected WrappedSuiteTestDecorator newTestDecorator(WrappedSuiteTestDecorator targetTest, Set<TestClientDetails> enlistedClients, ConversationFactory conversationFactory, Connection connection)
+ {
+ return new InvitingTestDecorator(targetTest, enlistedClients, conversationFactory, connection);
+ }
+
+ /**
+ * Creates the TestResult object to be used for test runs.
+ *
+ * @return An instance of the test result object.
+ */
+ protected TestResult createTestResult()
+ {
+ log.debug("protected TestResult createTestResult(): called");
+
+ TKTestResult result = new TKTestResult(fPrinter.getWriter(), delay, verbose, testCaseName);
+
+ // Check if a directory to output reports to has been specified and attach test listeners if so.
+ if (_reportDir != null)
+ {
+ // Create the report directory if it does not already exist.
+ File reportDirFile = new File(_reportDir);
+
+ if (!reportDirFile.exists())
+ {
+ reportDirFile.mkdir();
+ }
+
+ // Create the timings file (make the name of this configurable as a command line parameter).
+ Writer timingsWriter = null;
+
+ try
+ {
+ File timingsFile = new File(reportDirFile, "TEST." + currentTestClassName + ".xml");
+ timingsWriter = new BufferedWriter(new FileWriter(timingsFile), 20000);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException("Unable to create the log file to write test results to: " + e, e);
+ }
+
+ // Set up a CSV results listener to output the timings to the results file.
+ XMLTestListener listener = new XMLTestListener(timingsWriter, currentTestClassName);
+ result.addListener(listener);
+ result.addTKTestListener(listener);
+
+ // Register the results listeners shutdown hook to flush its data if the test framework is shutdown
+ // prematurely.
+ // registerShutdownHook(listener);
+
+ // Record the start time of the batch.
+ // result.notifyStartBatch();
+
+ // At this point in time the test class has been instantiated, giving it an opportunity to read its parameters.
+ // Inform any test listers of the test properties.
+ result.notifyTestProperties(TestContextProperties.getAccessedProps());
+ }
+
+ return result;
+ }
+
+ public void setReportDir(String reportDir)
+ {
+ _reportDir = reportDir;
+ }
+}
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/InvitingTestDecorator.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/InvitingTestDecorator.java
new file mode 100644
index 0000000000..8695f7f66f
--- /dev/null
+++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/InvitingTestDecorator.java
@@ -0,0 +1,220 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.interop.coordinator;
+
+import java.util.*;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+
+import junit.framework.Test;
+import junit.framework.TestResult;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.util.ConversationFactory;
+
+import uk.co.thebadgerset.junit.extensions.WrappedSuiteTestDecorator;
+
+/**
+ * <p><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Broadcast test invitations and collect enlists. <td> {@link ConversationFactory}.
+ * <tr><td> Output test failures for clients unwilling to run the test case. <td> {@link Coordinator}
+ * <tr><td> Execute coordinated test cases. <td> {@link CoordinatingTestCase}
+ * </table>
+ */
+public class InvitingTestDecorator extends WrappedSuiteTestDecorator
+{
+ private static final Logger log = Logger.getLogger(InvitingTestDecorator.class);
+
+ /** Holds the contact information for all test clients that are available and that may take part in the test. */
+ Set<TestClientDetails> allClients;
+
+ /** Holds the conversation helper for the control level conversation for coordinating the test through. */
+ ConversationFactory conversationFactory;
+
+ /** Holds the connection that the control conversation is held over. */
+ Connection connection;
+
+ /** Holds the underlying {@link CoordinatingTestCase}s that this decorator wraps. */
+ WrappedSuiteTestDecorator testSuite;
+
+ /**
+ * Creates a wrapped suite test decorator from another one.
+ *
+ * @param suite The test suite.
+ * @param availableClients The list of all clients that responded to the compulsory invite.
+ * @param controlConversation The conversation helper for the control level, test coordination conversation.
+ * @param controlConnection The connection that the coordination messages are sent over.
+ */
+ public InvitingTestDecorator(WrappedSuiteTestDecorator suite, Set<TestClientDetails> availableClients,
+ ConversationFactory controlConversation, Connection controlConnection)
+ {
+ super(suite);
+
+ log.debug("public InvitingTestDecorator(WrappedSuiteTestDecorator suite, Set<TestClientDetails> allClients = "
+ + availableClients + ", ConversationHelper controlConversation = " + controlConversation + "): called");
+
+ testSuite = suite;
+ allClients = availableClients;
+ conversationFactory = controlConversation;
+ connection = controlConnection;
+ }
+
+ /**
+ * Broadcasts a test invitation and accetps enlisting from participating clients. The wrapped test case is
+ * then repeated for every combination of test clients (provided the wrapped test case extends
+ * {@link CoordinatingTestCase}.
+ *
+ * <p/>Any JMSExceptions during the invite/enlist conversation will be allowed to fall through as runtime exceptions,
+ * resulting in the non-completion of the test run.
+ *
+ * @todo Better error recovery for failure of the invite/enlist conversation could be added.
+ *
+ * @param testResult The the results object to monitor the test results with.
+ */
+ public void run(TestResult testResult)
+ {
+ log.debug("public void run(TestResult testResult): called");
+
+ Collection<Test> tests = testSuite.getAllUnderlyingTests();
+
+ for (Test test : tests)
+ {
+ CoordinatingTestCase coordTest = (CoordinatingTestCase) test;
+
+ // Broadcast the invitation to find out what clients are available to test.
+ Set<TestClientDetails> enlists;
+ try
+ {
+ Message invite = conversationFactory.getSession().createMessage();
+ Destination controlTopic = conversationFactory.getSession().createTopic("iop.control");
+ ConversationFactory.Conversation conversation = conversationFactory.startConversation();
+
+ invite.setStringProperty("CONTROL_TYPE", "INVITE");
+ invite.setStringProperty("TEST_NAME", coordTest.getTestCaseNameForTestMethod(coordTest.getName()));
+
+ conversation.send(controlTopic, invite);
+
+ // Wait for a short time, to give test clients an opportunity to reply to the invitation.
+ Collection<Message> replies = conversation.receiveAll(allClients.size(), 3000);
+ enlists = Coordinator.extractEnlists(replies);
+ }
+ catch (JMSException e)
+ {
+ throw new RuntimeException("There was a JMSException during the invite/enlist conversation.", e);
+ }
+
+ // Compare the list of willing clients to the list of all available.
+ Set<TestClientDetails> optOuts = new HashSet<TestClientDetails>(allClients);
+ optOuts.removeAll(enlists);
+
+ // Output test failures for clients that will not particpate in the test.
+ Set<List<TestClientDetails>> failPairs = allPairs(optOuts, allClients);
+
+ for (List<TestClientDetails> failPair : failPairs)
+ {
+ CoordinatingTestCase failTest = new OptOutTestCase("testOptOut");
+ failTest.setSender(failPair.get(0));
+ failTest.setReceiver(failPair.get(1));
+
+ failTest.run(testResult);
+ }
+
+ // Loop over all combinations of clients, willing to run the test.
+ Set<List<TestClientDetails>> enlistedPairs = allPairs(enlists, enlists);
+
+ for (List<TestClientDetails> enlistedPair : enlistedPairs)
+ {
+ // Set the sending and receiving client details on the test case.
+ coordTest.setSender(enlistedPair.get(0));
+ coordTest.setReceiver(enlistedPair.get(1));
+
+ // Pass down the connection to hold the coordination conversation over.
+ coordTest.setConversationFactory(conversationFactory);
+
+ // Execute the test case.
+ coordTest.run(testResult);
+ }
+ }
+ }
+
+ /**
+ * Prints a string summarizing this test decorator, mainly for debugging purposes.
+ *
+ * @return String representation for debugging purposes.
+ */
+ public String toString()
+ {
+ return "InvitingTestDecorator: [ testSuite = " + testSuite + " ]";
+ }
+
+ /**
+ * Produces all pairs of combinations of elements from two sets. The ordering of the elements in the pair is
+ * important, that is the pair <l, r> is distinct from <r, l>; both pairs are generated. For any element, i, in
+ * both the left and right sets, the reflexive pair <i, i> is not generated.
+ *
+ * @param left The left set.
+ * @param right The right set.
+ *
+ * @return All pairs formed from the permutations of all elements of the left and right sets.
+ */
+ private <E> Set<List<E>> allPairs(Set<E> left, Set<E> right)
+ {
+ log.debug("private <E> Set<List<E>> allPairs(Set<E> left = " + left + ", Set<E> right = " + right + "): called");
+
+ Set<List<E>> results = new HashSet<List<E>>();
+
+ // Form all pairs from left to right.
+ // Form all pairs from right to left.
+ for (E le : left)
+ {
+ for (E re : right)
+ {
+ if (!le.equals(re))
+ {
+ results.add(new Pair<E>(le, re));
+ results.add(new Pair<E>(re, le));
+ }
+ }
+ }
+
+ log.debug("results = " + results);
+
+ return results;
+ }
+
+ /**
+ * A simple implementation of a pair, using a list.
+ */
+ private class Pair<T> extends ArrayList<T>
+ {
+ public Pair(T first, T second)
+ {
+ super();
+ super.add(first);
+ super.add(second);
+ }
+ }
+}
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/ListeningCoordinatorTest.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/ListeningCoordinatorTest.java
new file mode 100644
index 0000000000..1b4461f8c2
--- /dev/null
+++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/ListeningCoordinatorTest.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.interop.coordinator;
+
+import javax.jms.Message;
+
+public interface ListeningCoordinatorTest
+{
+ public void latejoin(Message message);
+}
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/ListeningTestDecorator.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/ListeningTestDecorator.java
new file mode 100644
index 0000000000..4312dfbcc6
--- /dev/null
+++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/ListeningTestDecorator.java
@@ -0,0 +1,200 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.interop.coordinator;
+
+import junit.framework.Test;
+import junit.framework.TestResult;
+import org.apache.log4j.Logger;
+import org.apache.qpid.util.ConversationFactory;
+import uk.co.thebadgerset.junit.extensions.WrappedSuiteTestDecorator;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Set;
+
+/**
+ * <p><table id="crc"><caption>CRC Card</caption> <tr><th> Responsibilities <th> Collaborations <tr><td> Broadcast test
+ * invitations and collect enlists. <td> {@link ConversationFactory}. <tr><td> Output test failures for clients
+ * unwilling to run the test case. <td> {@link Coordinator} <tr><td> Execute coordinated test cases. <td> {@link
+ * CoordinatingTestCase} </table>
+ */
+public class ListeningTestDecorator extends WrappedSuiteTestDecorator implements MessageListener
+{
+ private static final Logger log = Logger.getLogger(ListeningTestDecorator.class);
+
+ /** Holds the contact information for all test clients that are available and that may take part in the test. */
+ Set<TestClientDetails> allClients;
+
+ /** Holds the conversation helper for the control level conversation for coordinating the test through. */
+ ConversationFactory conversationFactory;
+
+ /** Holds the connection that the control conversation is held over. */
+ Connection connection;
+
+ /** Holds the underlying {@link CoordinatingTestCase}s that this decorator wraps. */
+ WrappedSuiteTestDecorator testSuite;
+
+ /** Hold the current running test case. */
+ CoordinatingTestCase _currentTest = null;
+
+ /**
+ * Creates a wrapped suite test decorator from another one.
+ *
+ * @param suite The test suite.
+ * @param availableClients The list of all clients that responded to the compulsory invite.
+ * @param controlConversation The conversation helper for the control level, test coordination conversation.
+ * @param controlConnection The connection that the coordination messages are sent over.
+ */
+ public ListeningTestDecorator(WrappedSuiteTestDecorator suite, Set<TestClientDetails> availableClients,
+ ConversationFactory controlConversation, Connection controlConnection)
+ {
+ super(suite);
+
+ log.debug("public InvitingTestDecorator(WrappedSuiteTestDecorator suite, Set<TestClientDetails> allClients = "
+ + availableClients + ", ConversationHelper controlConversation = " + controlConversation + "): called");
+
+ testSuite = suite;
+ allClients = availableClients;
+ conversationFactory = controlConversation;
+ connection = controlConnection;
+ }
+
+ /**
+ * Broadcasts a test invitation and accetps enlisting from participating clients. The wrapped test case is then
+ * repeated for every combination of test clients (provided the wrapped test case extends {@link
+ * CoordinatingTestCase}.
+ *
+ * <p/>Any JMSExceptions during the invite/enlist conversation will be allowed to fall through as runtime
+ * exceptions, resulting in the non-completion of the test run.
+ *
+ * @param testResult The the results object to monitor the test results with.
+ *
+ * @todo Better error recovery for failure of the invite/enlist conversation could be added.
+ */
+ public void run(TestResult testResult)
+ {
+ log.debug("public void run(TestResult testResult): called");
+
+ Collection<Test> tests = testSuite.getAllUnderlyingTests();
+
+ for (Test test : tests)
+ {
+ CoordinatingTestCase coordTest = (CoordinatingTestCase) test;
+
+ Set<TestClientDetails> enlists = signupClients(coordTest);
+
+ if (enlists.size() == 0)
+ {
+ throw new RuntimeException("No clients to test with");
+ }
+
+ Iterator<TestClientDetails> clients = enlists.iterator();
+ coordTest.setSender(clients.next());
+
+ while (clients.hasNext())
+ {
+ // Set the sending and receiving client details on the test case.
+ coordTest.setReceiver(clients.next());
+ }
+
+ // Pass down the connection to hold the coordination conversation over.
+ coordTest.setConversationFactory(conversationFactory);
+
+
+ if (coordTest instanceof ListeningCoordinatorTest)
+ {
+ _currentTest = coordTest;
+ }
+ // Execute the test case.
+ coordTest.run(testResult);
+
+ _currentTest = null;
+ }
+ }
+
+ private Set<TestClientDetails> signupClients(CoordinatingTestCase coordTest)
+ {
+ // Broadcast the invitation to find out what clients are available to test.
+ Set<TestClientDetails> enlists;
+ try
+ {
+ Message invite = conversationFactory.getSession().createMessage();
+ Destination controlTopic = conversationFactory.getSession().createTopic("iop.control");
+ ConversationFactory.Conversation conversation = conversationFactory.startConversation();
+
+ invite.setStringProperty("CONTROL_TYPE", "INVITE");
+ invite.setStringProperty("TEST_NAME", coordTest.getTestCaseNameForTestMethod(coordTest.getName()));
+
+ conversation.send(controlTopic, invite);
+
+ // Wait for a short time, to give test clients an opportunity to reply to the invitation.
+ Collection<Message> replies = conversation.receiveAll(allClients.size(), 5000);
+
+ log.debug("Received " + replies.size() + " enlist replies");
+
+ enlists = Coordinator.extractEnlists(replies);
+
+ //Create topic to listen on for latejoiners
+ Destination listenTopic = conversationFactory.getSession().createTopic("iop.control.test." + coordTest.getTestCaseNameForTestMethod(coordTest.getName()));
+
+ //Listen for joiners
+ conversationFactory.getSession().createConsumer(listenTopic).setMessageListener(this);
+ log.debug("Created consumer on :" + listenTopic);
+ }
+ catch (JMSException e)
+ {
+ throw new RuntimeException("There was a JMSException during the invite/enlist conversation.", e);
+ }
+
+ return enlists;
+ }
+
+ /**
+ * Prints a string summarizing this test decorator, mainly for debugging purposes.
+ *
+ * @return String representation for debugging purposes.
+ */
+ public String toString()
+ {
+ return "ListeningTestDecorator: [ testSuite = " + testSuite + " ]";
+ }
+
+
+ public void onMessage(Message message)
+ {
+ try
+ {
+ if (message.getStringProperty("CONTROL_TYPE").equals("LATEJOIN"))
+ {
+ ((ListeningCoordinatorTest) _currentTest).latejoin(message);
+ }
+ }
+ catch (JMSException e)
+ {
+ log.debug("Unable to process message:" + message);
+ }
+ }
+}
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/OptOutTestCase.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/OptOutTestCase.java
new file mode 100644
index 0000000000..42a382a898
--- /dev/null
+++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/OptOutTestCase.java
@@ -0,0 +1,65 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.interop.coordinator;
+
+import junit.framework.Assert;
+
+/**
+ * An OptOutTestCase is a test case that automatically fails. It is used when a list of test clients has been generated
+ * from a compulsory invite, but only some of those clients have responded to a specific test case invite. The clients
+ * that did not respond, are automatically given a fail for the test.
+ *
+ * <p><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Fail the test with a suitable reason.
+ * </table>
+ */
+public class OptOutTestCase extends CoordinatingTestCase
+{
+ /**
+ * Creates a new coordinating test case with the specified name.
+ *
+ * @param name The test case name.
+ */
+ public OptOutTestCase(String name)
+ {
+ super(name);
+ }
+
+ /** Generates an appropriate test failure assertion. */
+ public void testOptOut()
+ {
+ Assert.fail("One of " + getSender() + " and " + getReceiver() + " opted out of the test.");
+ }
+
+ /**
+ * Should provide a translation from the junit method name of a test to its test case name as defined in the
+ * interop testing specification. For example the method "testP2P" might map onto the interop test case name
+ * "TC2_BasicP2P".
+ *
+ * @param methodName The name of the JUnit test method.
+ * @return The name of the corresponding interop test case.
+ */
+ public String getTestCaseNameForTestMethod(String methodName)
+ {
+ return "OptOutTest";
+ }
+}
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/TestClientDetails.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/TestClientDetails.java
new file mode 100644
index 0000000000..c4a9d39cd8
--- /dev/null
+++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/TestClientDetails.java
@@ -0,0 +1,87 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.interop.coordinator;
+
+/**
+ * <p><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * </table>
+ */
+public class TestClientDetails
+{
+ /** The test clients name. */
+ public String clientName;
+
+ /* The test clients unique sequence number. Not currently used. */
+
+ /** The routing key of the test clients control topic. */
+ public String privateControlKey;
+
+ /**
+ * Two TestClientDetails are considered to be equal, iff they have the same client name.
+ *
+ * @param o The object to compare to.
+ *
+ * @return <tt>If the object to compare to is a TestClientDetails equal to this one, <tt>false</tt> otherwise.
+ */
+ public boolean equals(Object o)
+ {
+ if (this == o)
+ {
+ return true;
+ }
+
+ if (!(o instanceof TestClientDetails))
+ {
+ return false;
+ }
+
+ final TestClientDetails testClientDetails = (TestClientDetails) o;
+
+ if ((clientName != null) ? (!clientName.equals(testClientDetails.clientName))
+ : (testClientDetails.clientName != null))
+ {
+ return false;
+ }
+
+ return true;
+ }
+
+ /**
+ * Computes a hash code compatible with the equals method; based on the client name alone.
+ *
+ * @return A hash code for this.
+ */
+ public int hashCode()
+ {
+ return ((clientName != null) ? clientName.hashCode() : 0);
+ }
+
+ /**
+ * Outputs the client name and address details. Mostly used for debugging purposes.
+ *
+ * @return The client name and address.
+ */
+ public String toString()
+ {
+ return "TestClientDetails: [ clientName = " + clientName + ", privateControlKey = " + privateControlKey + " ]";
+ }
+}
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/XMLTestListener.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/XMLTestListener.java
new file mode 100644
index 0000000000..747ba0dd0b
--- /dev/null
+++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/XMLTestListener.java
@@ -0,0 +1,402 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.interop.coordinator;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.Writer;
+import java.util.*;
+
+import junit.framework.AssertionFailedError;
+import junit.framework.Test;
+import junit.framework.TestCase;
+
+import org.apache.log4j.Logger;
+
+import uk.co.thebadgerset.junit.extensions.listeners.TKTestListener;
+
+/**
+ * Listens for test results for a named test and outputs these in the standard JUnit XML format to the specified
+ * writer.
+ *
+ * <p/>The API for this listener accepts notifications about different aspects of a tests results through different
+ * methods, so some assumption needs to be made as to which test result a notification refers to. For example
+ * {@link #startTest} will be called, then possibly {@link #timing} will be called, even though the test instance is
+ * passed in both cases, it is not enough to distinguish a particular run of the test, as the test case instance may
+ * be being shared between multiple threads, or being run a repeated number of times, and can therfore be re-used
+ * between calls. The listeners make the assumption that, for every test, a unique thread will call {@link #startTest}
+ * and {@link #endTest} to delimit each test. All calls to set test parameters, timings, state and so on, will occur
+ * between the start and end and will be given with the same thread id as the start and end, so the thread id provides
+ * a unqiue value to identify a particular test run against.
+ *
+ * <p><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * </table>
+ *
+ * @todo Merge this class with CSV test listener, making the collection of results common to both, and only factoring
+ * out the results printing code into sub-classes. Provide a simple XML results formatter with the same format as
+ * the ant XML formatter, and a more structured one for outputing results with timings and summaries from
+ * performance tests.
+ */
+public class XMLTestListener implements TKTestListener
+{
+ /** Used for debugging. */
+ private static final Logger log = Logger.getLogger(XMLTestListener.class);
+
+ /** The results file writer. */
+ protected Writer writer;
+
+ /** Holds the results for individual tests. */
+ // protected Map<Result, Result> results = new LinkedHashMap<Result, Result>();
+ // protected List<Result> results = new ArrayList<Result>();
+
+ /**
+ * Map for holding results on a per thread basis as they come in. A ThreadLocal is not used as sometimes an
+ * explicit thread id must be used, where notifications come from different threads than the ones that called
+ * the test method.
+ */
+ Map<Long, Result> threadLocalResults = Collections.synchronizedMap(new LinkedHashMap<Long, Result>());
+
+ /**
+ * Holds results for tests that have ended. Transferring these results here from the per-thread results map, means
+ * that the thread id is freed for the thread to generate more results.
+ */
+ List<Result> results = new ArrayList<Result>();
+
+ /** Holds the overall error count. */
+ protected int errors = 0;
+
+ /** Holds the overall failure count. */
+ protected int failures = 0;
+
+ /** Holds the overall tests run count. */
+ protected int runs = 0;
+
+ /** Holds the name of the class that tests are being run for. */
+ String testClassName;
+
+ /**
+ * Creates a new XML results output listener that writes to the specified location.
+ *
+ * @param writer The location to write results to.
+ */
+ public XMLTestListener(Writer writer, String testClassName)
+ {
+ log.debug("public XMLTestListener(Writer writer, String testClassName = " + testClassName + "): called");
+
+ this.writer = writer;
+ this.testClassName = testClassName;
+ }
+
+ /**
+ * Resets the test results to the default state of time zero, memory usage zero, parameter zero, test passed.
+ *
+ * @param test The test to resest any results for.
+ * @param threadId Optional thread id if not calling from thread that started the test method. May be null.
+ */
+ public void reset(Test test, Long threadId)
+ {
+ log.debug("public void reset(Test test = " + test + ", Long threadId = " + threadId + "): called");
+
+ XMLTestListener.Result r =
+ (threadId == null) ? threadLocalResults.get(Thread.currentThread().getId()) : threadLocalResults.get(threadId);
+
+ r.error = null;
+ r.failure = null;
+
+ }
+
+ /**
+ * A test started.
+ */
+ public void startTest(Test test)
+ {
+ log.debug("public void startTest(Test test = " + test + "): called");
+
+ Result newResult = new Result(test.getClass().getName(), ((TestCase) test).getName());
+
+ // Initialize the thread local test results.
+ threadLocalResults.put(Thread.currentThread().getId(), newResult);
+ runs++;
+ }
+
+ /**
+ * Should be called every time a test completes with the run time of that test.
+ *
+ * @param test The name of the test.
+ * @param nanos The run time of the test in nanoseconds.
+ * @param threadId Optional thread id if not calling from thread that started the test method. May be null.
+ */
+ public void timing(Test test, long nanos, Long threadId)
+ { }
+
+ /**
+ * Should be called every time a test completed with the amount of memory used before and after the test was run.
+ *
+ * @param test The test which memory was measured for.
+ * @param memStart The total JVM memory used before the test was run.
+ * @param memEnd The total JVM memory used after the test was run.
+ * @param threadId Optional thread id if not calling from thread that started the test method. May be null.
+ */
+ public void memoryUsed(Test test, long memStart, long memEnd, Long threadId)
+ { }
+
+ /**
+ * Should be called every time a parameterized test completed with the int value of its test parameter.
+ *
+ * @param test The test which memory was measured for.
+ * @param parameter The int parameter value.
+ * @param threadId Optional thread id if not calling from thread that started the test method. May be null.
+ */
+ public void parameterValue(Test test, int parameter, Long threadId)
+ { }
+
+ /**
+ * Should be called every time a test completes with the current number of test threads running.
+ *
+ * @param test The test for which the measurement is being generated.
+ * @param threads The number of tests being run concurrently.
+ * @param threadId Optional thread id if not calling from thread that started the test method. May be null.
+ */
+ public void concurrencyLevel(Test test, int threads, Long threadId)
+ { }
+
+ /**
+ * Notifies listeners of the tests read/set properties.
+ *
+ * @param properties The tests read/set properties.
+ */
+ public void properties(Properties properties)
+ { }
+
+ /**
+ * A test ended.
+ */
+ public void endTest(Test test)
+ {
+ log.debug("public void endTest(Test test = " + test + "): called");
+
+ // Move complete test results into the completed tests list.
+ Result r = threadLocalResults.get(Thread.currentThread().getId());
+ results.add(r);
+
+ // Clear all the test results for the thread.
+ threadLocalResults.remove(Thread.currentThread().getId());
+ }
+
+ /**
+ * Called when a test completes. Success, failure and errors. This method should be used when registering an
+ * end test from a different thread than the one that started the test.
+ *
+ * @param test The test which completed.
+ * @param threadId Optional thread id if not calling from thread that started the test method. May be null.
+ */
+ public void endTest(Test test, Long threadId)
+ {
+ log.debug("public void endTest(Test test = " + test + ", Long threadId = " + threadId + "): called");
+
+ // Move complete test results into the completed tests list.
+ Result r =
+ (threadId == null) ? threadLocalResults.get(Thread.currentThread().getId()) : threadLocalResults.get(threadId);
+ results.add(r);
+
+ // Clear all the test results for the thread.
+ threadLocalResults.remove(Thread.currentThread().getId());
+ }
+
+ /**
+ * An error occurred.
+ */
+ public void addError(Test test, Throwable t)
+ {
+ log.debug("public void addError(Test test = " + test + ", Throwable t = " + t + "): called");
+
+ Result r = threadLocalResults.get(Thread.currentThread().getId());
+ r.error = t;
+ errors++;
+ }
+
+ /**
+ * A failure occurred.
+ */
+ public void addFailure(Test test, AssertionFailedError t)
+ {
+ log.debug("public void addFailure(Test test = " + test + ", AssertionFailedError t = " + t + "): called");
+
+ Result r = threadLocalResults.get(Thread.currentThread().getId());
+ r.failure = t;
+ failures++;
+ }
+
+ /**
+ * Called when a test completes to mark it as a test fail. This method should be used when registering a
+ * failure from a different thread than the one that started the test.
+ *
+ * @param test The test which failed.
+ * @param e The assertion that failed the test.
+ * @param threadId Optional thread id if not calling from thread that started the test method. May be null.
+ */
+ public void addFailure(Test test, AssertionFailedError e, Long threadId)
+ {
+ log.debug("public void addFailure(Test test, AssertionFailedError e, Long threadId): called");
+
+ Result r =
+ (threadId == null) ? threadLocalResults.get(Thread.currentThread().getId()) : threadLocalResults.get(threadId);
+ r.failure = e;
+ failures++;
+ }
+
+ /**
+ * Notifies listeners of the start of a complete run of tests.
+ */
+ public void startBatch()
+ {
+ log.debug("public void startBatch(): called");
+
+ // Reset all results counts.
+ threadLocalResults = Collections.synchronizedMap(new HashMap<Long, Result>());
+ errors = 0;
+ failures = 0;
+ runs = 0;
+
+ // Write out the file header.
+ try
+ {
+ writer.write("<?xml version=\"1.0\" ?>\n");
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException("Unable to write the test results.", e);
+ }
+ }
+
+ /**
+ * Notifies listeners of the end of a complete run of tests.
+ *
+ * @param parameters The optional test parameters to log out with the batch results.
+ */
+ public void endBatch(Properties parameters)
+ {
+ log.debug("public void endBatch(Properties parameters = " + parameters + "): called");
+
+ // Write out the results.
+ try
+ {
+ // writer.write("<?xml version=\"1.0\" ?>\n");
+ writer.write("<testsuite errors=\"" + errors + "\" failures=\"" + failures + "\" tests=\"" + runs + "\" name=\""
+ + testClassName + "\">\n");
+
+ for (Result result : results)
+ {
+ writer.write(" <testcase classname=\"" + result.testClass + "\" name=\"" + result.testName + "\">\n");
+
+ if (result.error != null)
+ {
+ writer.write(" <error type=\"" + result.error.getClass() + "\">");
+ result.error.printStackTrace(new PrintWriter(writer));
+ writer.write(" </error>");
+ }
+ else if (result.failure != null)
+ {
+ writer.write(" <failure type=\"" + result.failure.getClass() + "\">");
+ result.failure.printStackTrace(new PrintWriter(writer));
+ writer.write(" </failure>");
+ }
+
+ writer.write(" </testcase>\n");
+ }
+
+ writer.write("</testsuite>\n");
+ writer.flush();
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException("Unable to write the test results.", e);
+ }
+ }
+
+ /**
+ * Used to capture the results of a particular test run.
+ */
+ protected static class Result
+ {
+ public Result(String testClass, String testName)
+ {
+ this.testClass = testClass;
+ this.testName = testName;
+ }
+
+ public String testClass;
+ public String testName;
+
+ /** Holds the exception that caused error in this test. */
+ public Throwable error;
+
+ /** Holds the assertion exception that caused failure in this test. */
+ public AssertionFailedError failure;
+
+ /** Holds the error count for this test. */
+ // public int errors = 0;
+
+ /** Holds the failure count for this tests. */
+ // public int failures = 0;
+
+ /** Holds the overall tests run count for this test. */
+ // public int runs = 0;
+
+ /*public boolean equals(Object o)
+ {
+ if (this == o)
+ {
+ return true;
+ }
+
+ if (!(o instanceof Result))
+ {
+ return false;
+ }
+
+ final Result result = (Result) o;
+
+ if ((testClass != null) ? (!testClass.equals(result.testClass)) : (result.testClass != null))
+ {
+ return false;
+ }
+
+ if ((testName != null) ? (!testName.equals(result.testName)) : (result.testName != null))
+ {
+ return false;
+ }
+
+ return true;
+ }
+
+ public int hashCode()
+ {
+ int result;
+ result = ((testClass != null) ? testClass.hashCode() : 0);
+ result = (29 * result) + ((testName != null) ? testName.hashCode() : 0);
+
+ return result;
+ }*/
+ }
+}
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/testcases/InteropTestCase1DummyRun.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/testcases/CoordinatingTestCase1DummyRun.java
index 73e08b578e..e642ef792b 100644
--- a/java/integrationtests/src/main/java/org/apache/qpid/interop/testcases/InteropTestCase1DummyRun.java
+++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/testcases/CoordinatingTestCase1DummyRun.java
@@ -18,55 +18,56 @@
* under the License.
*
*/
-package org.apache.qpid.interop.testcases;
-import org.apache.log4j.Logger;
+package org.apache.qpid.interop.coordinator.testcases;
+
+import java.util.HashMap;
+import java.util.Map;
-import org.apache.qpid.test.framework.distributedtesting.DistributedTestCase;
+import javax.jms.Message;
-import java.util.Properties;
+import junit.framework.Assert;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.interop.coordinator.CoordinatingTestCase;
/**
- * Coordinates test case 1, from the interop test specification. This test connects up the sender and receivers roles,
- * and gets some dummy test reports from them, in order to check that the test framework itself is operational.
- *
* <p><table id="crc"><caption>CRC Card</caption>
* <tr><th> Responsibilities <th> Collaborations
* <tr><td> Exercises the interop testing framework without actually sending any test messages.
- * <td> {@link org.apache.qpid.test.framework.distributedtesting.DistributedTestCase}
+ * <td> {@link org.apache.qpid.interop.coordinator.CoordinatingTestCase}
* </table>
*/
-public class InteropTestCase1DummyRun extends DistributedTestCase
+public class CoordinatingTestCase1DummyRun extends CoordinatingTestCase
{
/** Used for debugging. */
- private static final Logger log = Logger.getLogger(InteropTestCase1DummyRun.class);
+ private static final Logger log = Logger.getLogger(CoordinatingTestCase1DummyRun.class);
/**
* Creates a new coordinating test case with the specified name.
*
* @param name The test case name.
*/
- public InteropTestCase1DummyRun(String name)
+ public CoordinatingTestCase1DummyRun(String name)
{
super(name);
}
/**
* Performs the basic P2P test case, "Test Case 2" in the specification.
- *
- * @throws Exception Any exceptions are allowed to fall through and fail the test.
*/
public void testDummyRun() throws Exception
{
log.debug("public void testDummyRun(): called");
- Properties testConfig = new Properties();
+ Map<String, Object> testConfig = new HashMap<String, Object>();
testConfig.put("TEST_NAME", "TC1_DummyRun");
- /*Message[] reports =*/ getTestSequencer().sequenceTest(null, null, testConfig);
+ Message[] reports = sequenceTest(testConfig);
- // Compare sender and receivers reports.
- // Assert.assertEquals("Expected to get 2 dummy reports.", 2, reports.length);
+ // Compare sender and receiver reports.
+ Assert.assertEquals("Expected to get 2 dummy reports.", 2, reports.length);
}
/**
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/testcases/InteropTestCase2BasicP2P.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/testcases/CoordinatingTestCase2BasicP2P.java
index f77bbf032f..b1b2d9f847 100644
--- a/java/integrationtests/src/main/java/org/apache/qpid/interop/testcases/InteropTestCase2BasicP2P.java
+++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/testcases/CoordinatingTestCase2BasicP2P.java
@@ -18,61 +18,61 @@
* under the License.
*
*/
-package org.apache.qpid.interop.testcases;
-import org.apache.log4j.Logger;
+package org.apache.qpid.interop.coordinator.testcases;
+
+import java.util.HashMap;
+import java.util.Map;
-import org.apache.qpid.test.framework.distributedtesting.DistributedTestCase;
+import javax.jms.Message;
-import java.util.Properties;
+import junit.framework.Assert;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.interop.coordinator.CoordinatingTestCase;
/**
- * Implements test case 2, from the interop test specification. This test sets up the TC2_BasicP2P test for 50
- * messages. It checks that the sender and receivers reports both indicate that all the test messages were transmitted
- * successfully.
- *
* <p><table id="crc"><caption>CRC Card</caption>
* <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Setup p2p test parameters and compare with test output. <td> {@link DistributedTestCase}
+ * <tr><td> Setup p2p test parameters and compare with test output. <td> {@link CoordinatingTestCase}
* </table>
*/
-public class InteropTestCase2BasicP2P extends DistributedTestCase
+public class CoordinatingTestCase2BasicP2P extends CoordinatingTestCase
{
/** Used for debugging. */
- private static final Logger log = Logger.getLogger(InteropTestCase2BasicP2P.class);
+ private static final Logger log = Logger.getLogger(CoordinatingTestCase2BasicP2P.class);
/**
* Creates a new coordinating test case with the specified name.
*
* @param name The test case name.
*/
- public InteropTestCase2BasicP2P(String name)
+ public CoordinatingTestCase2BasicP2P(String name)
{
super(name);
}
/**
* Performs the basic P2P test case, "Test Case 2" in the specification.
- *
- * @throws Exception Any exceptions are allowed to fall through and fail the test.
*/
public void testBasicP2P() throws Exception
{
log.debug("public void testBasicP2P(): called");
- Properties testConfig = new Properties();
- testConfig.setProperty("TEST_NAME", "TC2_BasicP2P");
- testConfig.setProperty("P2P_QUEUE_AND_KEY_NAME", "tc2queue");
+ Map<String, Object> testConfig = new HashMap<String, Object>();
+ testConfig.put("TEST_NAME", "TC2_BasicP2P");
+ testConfig.put("P2P_QUEUE_AND_KEY_NAME", "tc2queue");
testConfig.put("P2P_NUM_MESSAGES", 50);
- /*Message[] reports =*/ getTestSequencer().sequenceTest(null, null, testConfig);
+ Message[] reports = sequenceTest(testConfig);
- // Compare sender and receivers reports.
- /*int messagesSent = reports[0].getIntProperty("MESSAGE_COUNT");
+ // Compare sender and receiver reports.
+ int messagesSent = reports[0].getIntProperty("MESSAGE_COUNT");
int messagesReceived = reports[1].getIntProperty("MESSAGE_COUNT");
Assert.assertEquals("The requested number of messages were not sent.", 50, messagesSent);
- Assert.assertEquals("Sender and receivers messages sent did not match up.", messagesSent, messagesReceived);*/
+ Assert.assertEquals("Sender and receiver messages sent did not match up.", messagesSent, messagesReceived);
}
/**
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/testcases/InteropTestCase3BasicPubSub.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/testcases/CoordinatingTestCase3BasicPubSub.java
index ad27ca63bd..702c240e9a 100644
--- a/java/integrationtests/src/main/java/org/apache/qpid/interop/testcases/InteropTestCase3BasicPubSub.java
+++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/testcases/CoordinatingTestCase3BasicPubSub.java
@@ -18,59 +18,63 @@
* under the License.
*
*/
-package org.apache.qpid.interop.testcases;
-import org.apache.log4j.Logger;
+package org.apache.qpid.interop.coordinator.testcases;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.Message;
-import org.apache.qpid.test.framework.distributedtesting.DistributedTestCase;
+import junit.framework.Assert;
-import java.util.Properties;
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.interop.coordinator.CoordinatingTestCase;
/**
* <p><table id="crc"><caption>CRC Card</caption>
* <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Setup pub/sub test parameters and compare with test output. <td> {@link DistributedTestCase}
+ * <tr><td> Setup pub/sub test parameters and compare with test output. <td> {@link CoordinatingTestCase}
* </table>
*/
-public class InteropTestCase3BasicPubSub extends DistributedTestCase
+public class CoordinatingTestCase3BasicPubSub extends CoordinatingTestCase
{
/** Used for debugging. */
- private static final Logger log = Logger.getLogger(InteropTestCase3BasicPubSub.class);
+ private static final Logger log = Logger.getLogger(CoordinatingTestCase3BasicPubSub.class);
/**
* Creates a new coordinating test case with the specified name.
*
* @param name The test case name.
*/
- public InteropTestCase3BasicPubSub(String name)
+ public CoordinatingTestCase3BasicPubSub(String name)
{
super(name);
}
/**
* Performs the basic P2P test case, "Test Case 2" in the specification.
- *
- * @throws Exception Any exceptions are allowed to fall through and fail the test.
*/
public void testBasicPubSub() throws Exception
{
log.debug("public void testBasicPubSub(): called");
- Properties testConfig = new Properties();
+ Map<String, Object> testConfig = new HashMap<String, Object>();
testConfig.put("TEST_NAME", "TC3_BasicPubSub");
testConfig.put("PUBSUB_KEY", "tc3route");
testConfig.put("PUBSUB_NUM_MESSAGES", 10);
testConfig.put("PUBSUB_NUM_RECEIVERS", 5);
- /*Message[] reports =*/ getTestSequencer().sequenceTest(null, null, testConfig);
+ Message[] reports = sequenceTest(testConfig);
- // Compare sender and receivers reports.
- /*int messagesSent = reports[0].getIntProperty("MESSAGE_COUNT");
+ // Compare sender and receiver reports.
+ int messagesSent = reports[0].getIntProperty("MESSAGE_COUNT");
int messagesReceived = reports[1].getIntProperty("MESSAGE_COUNT");
Assert.assertEquals("The requested number of messages were not sent.", 10, messagesSent);
Assert.assertEquals("Received messages did not match up to num sent * num receivers.", messagesSent * 5,
- messagesReceived);*/
+ messagesReceived);
}
/**
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/old/Listener.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/old/Listener.java
new file mode 100644
index 0000000000..5545f8d2dc
--- /dev/null
+++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/old/Listener.java
@@ -0,0 +1,291 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.interop.old;
+
+import java.util.Random;
+
+import javax.jms.*;
+
+import org.apache.log4j.Logger;
+import org.apache.log4j.NDC;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.url.URLSyntaxException;
+
+/**
+ * Listener implements the listening end of the Qpid interop tests. It is capable of being run as a standalone listener
+ * that responds to the test messages send by the publishing end of the tests implemented by {@link org.apache.qpid.interop.old.Publisher}.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Count messages received on a topic. <td> {@link org.apache.qpid.interop.old.Publisher}
+ * <tr><td> Send reports on messages received, when requested to. <td> {@link org.apache.qpid.interop.old.Publisher}
+ * <tr><td> Shutdown, when requested to. <td> {@link org.apache.qpid.interop.old.Publisher}
+ * <tr><td>
+ *
+ * @todo This doesn't implement the interop test spec yet. Its a port of the old topic tests but has been adapted with
+ * interop spec in mind.
+ *
+ * @todo I've added lots of field table types in the report message, just to check if the other end can decode them
+ * correctly. Not really the right place to test this, so remove them from {@link #sendReport()} once a better
+ * test exists.
+ */
+public class Listener implements MessageListener
+{
+ private static Logger log = Logger.getLogger(Listener.class);
+
+ /** The default AMQ connection URL to use for tests. */
+ public static final String DEFAULT_URI = "amqp://guest:guest@default/test?brokerlist='tcp://localhost:5672'";
+
+ /** Holds the name of (routing key for) the topic to receive test messages on. */
+ public static final String CONTROL_TOPIC = "topic_control";
+
+ /** Holds the name of (routing key for) the queue to send reports to. */
+ public static final String RESPONSE_QUEUE = "response";
+
+ /** Holds the JMS Topic to receive test messages on. */
+ private final Topic _topic;
+
+ /** Holds the JMS Queue to send reports to. */
+ private final Queue _response;
+
+ /** Holds the connection to listen on. */
+ private final Connection _connection;
+
+ /** Holds the producer to send control messages on. */
+ private final MessageProducer _controller;
+
+ /** Holds the JMS session. */
+ private final javax.jms.Session _session;
+
+ /** Holds a flag to indicate that a timer has begun on the first message. Reset when report is sent. */
+ private boolean init;
+
+ /** Holds the count of messages received by this listener. */
+ private int count;
+
+ /** Used to hold the start time of the first message. */
+ private long start;
+
+ /**
+ * Creates a topic listener using the specified broker URL.
+ *
+ * @param connectionUrl The broker URL to listen on.
+ *
+ * @throws AMQException If the broker connection cannot be established.
+ * @throws URLSyntaxException If the broker URL syntax is not correct.
+ * @throws JMSException Any underlying JMSException is allowed to fall through.
+ */
+ Listener(String connectionUrl) throws AMQException, JMSException, URLSyntaxException
+ {
+ log.debug("Listener(String connectionUrl = " + connectionUrl + "): called");
+
+ // Create a connection to the broker.
+ _connection = new AMQConnection(connectionUrl);
+
+ // Establish a session on the broker.
+ _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ // Set up the destinations to listen for test and control messages on.
+ _topic = _session.createTopic(CONTROL_TOPIC);
+ _response = _session.createQueue(RESPONSE_QUEUE);
+
+ // Set this listener up to listen for incoming messages on the test topic.
+ _session.createConsumer(_topic).setMessageListener(this);
+
+ // Set up this listener with a producer to send the reports on.
+ _controller = _session.createProducer(_response);
+
+ _connection.start();
+ System.out.println("Waiting for messages...");
+ }
+
+ /**
+ * Starts a test subscriber. The broker URL must be specified as the first command line argument.
+ *
+ * @param argv The command line arguments, ignored.
+ *
+ * @todo Add command line arguments to configure all aspects of the test.
+ */
+ public static void main(String[] argv)
+ {
+ try
+ {
+ new Listener(DEFAULT_URI);
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * Handles all message received by this listener. Test messages are counted, report messages result in a report being sent and
+ * shutdown messages result in this listener being terminated.
+ *
+ * @param message The received message.
+ */
+ public void onMessage(Message message)
+ {
+ log.debug("public void onMessage(Message message = " + message + "): called");
+
+ // Take the start time of the first message if this is the first message.
+ if (!init)
+ {
+ start = System.nanoTime() / 1000000;
+ count = 0;
+ init = true;
+ }
+
+ try
+ {
+ // Check if the message is a control message telling this listener to shut down.
+ if (isShutdown(message))
+ {
+ log.debug("Got a shutdown message.");
+ shutdown();
+ }
+ // Check if the message is a report request message asking this listener to respond with the message count.
+ else if (isReport(message))
+ {
+ log.debug("Got a report request message.");
+
+ // Send the message count report.
+ sendReport();
+
+ // Reset the initialization flag so that the next message is considered to be the first.
+ init = false;
+ }
+ // Otherwise it is an ordinary test message, so increment the message count.
+ else
+ {
+ count++;
+ }
+ }
+ catch (JMSException e)
+ {
+ log.warn("There was a JMSException during onMessage.", e);
+ }
+ }
+
+ /**
+ * Checks a message to see if it is a termination request control message.
+ *
+ * @param m The message to check.
+ *
+ * @return <tt>true</tt> if it is a termination request control message, <tt>false</tt> otherwise.
+ *
+ * @throws JMSException Any underlying JMSException is allowed to fall through.
+ */
+ boolean isShutdown(Message m) throws JMSException
+ {
+ boolean result = checkTextField(m, "TYPE", "TERMINATION_REQUEST");
+
+ return result;
+ }
+
+ /**
+ * Checks a message to see if it is a report request control message.
+ *
+ * @param m The message to check.
+ *
+ * @return <tt>true</tt> if it is a report request control message, <tt>false</tt> otherwise.
+ *
+ * @throws JMSException Any underlying JMSException is allowed to fall through.
+ */
+ boolean isReport(Message m) throws JMSException
+ {
+ boolean result = checkTextField(m, "TYPE", "REPORT_REQUEST");
+
+ return result;
+ }
+
+ /**
+ * Checks whether or not a text field on a message has the specified value.
+ *
+ * @param m The message to check.
+ * @param fieldName The name of the field to check.
+ * @param value The expected value of the field to compare with.
+ *
+ * @return <tt>true</tt>If the specified field has the specified value, <tt>fals</tt> otherwise.
+ *
+ * @throws JMSException Any JMSExceptions are allowed to fall through.
+ */
+ private static boolean checkTextField(Message m, String fieldName, String value) throws JMSException
+ {
+ //log.debug("private static boolean checkTextField(Message m = " + m + ", String fieldName = " + fieldName
+ // + ", String value = " + value + "): called");
+
+ String comp = m.getStringProperty(fieldName);
+ //log.debug("comp = " + comp);
+
+ boolean result = (comp != null) && comp.equals(value);
+ //log.debug("result = " + result);
+
+ return result;
+ }
+
+ /**
+ * Closes down the connection to the broker.
+ *
+ * @throws JMSException Any underlying JMSException is allowed to fall through.
+ */
+ private void shutdown() throws JMSException
+ {
+ _session.close();
+ _connection.stop();
+ _connection.close();
+ }
+
+ /**
+ * Send the report message to the response queue.
+ *
+ * @throws JMSException Any underlying JMSException is allowed to fall through.
+ */
+ private void sendReport() throws JMSException
+ {
+ log.debug("private void report(): called");
+
+ // Create the report message.
+ long time = ((System.nanoTime() / 1000000) - start);
+ String msg = "Received " + count + " in " + time + "ms";
+ Message message = _session.createTextMessage(msg);
+
+ // Shove some more field table types in the message just to see if the other end can handle it.
+ message.setBooleanProperty("BOOLEAN", true);
+ //message.setByteProperty("BYTE", (byte) 5);
+ message.setDoubleProperty("DOUBLE", Math.PI);
+ message.setFloatProperty("FLOAT", 1.0f);
+ message.setIntProperty("INT", 1);
+ message.setShortProperty("SHORT", (short) 1);
+ message.setLongProperty("LONG", (long) 1827361278);
+ message.setStringProperty("STRING", "hello");
+
+ // Send the report message.
+ _controller.send(message);
+ log.debug("Sent report: " + msg);
+ }
+}
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/old/Publisher.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/old/Publisher.java
new file mode 100644
index 0000000000..f3a545f580
--- /dev/null
+++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/old/Publisher.java
@@ -0,0 +1,244 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.interop.old;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.*;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.url.URLSyntaxException;
+
+/**
+ * Publisher is the sending end of Qpid interop tests. It is capable of being run as a standalone publisher
+ * that sends test messages to the listening end of the tests implemented by {@link org.apache.qpid.interop.old.Listener}.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td>
+ *
+ * @todo This doesn't implement the interop test spec yet. Its a port of the old topic tests but has been adapted with
+ * interop spec in mind.
+ *
+ * @todo I've added lots of field table types in the report request message, just to check if the other end can decode
+ * them correctly. Not really the right place to test this, so remove them from {@link #doTest()} once a better
+ * test exists.
+ */
+public class Publisher implements MessageListener
+{
+ private static Logger log = Logger.getLogger(Publisher.class);
+
+ /** The default AMQ connection URL to use for tests. */
+ public static final String DEFAULT_URI = "amqp://guest:guest@default/test?brokerlist='tcp://localhost:5672'";
+
+ /** Holds the default test timeout for broker communications before tests give up. */
+ public static final int TIMEOUT = 3000;
+
+ /** Holds the routing key for the topic to send test messages on. */
+ public static final String CONTROL_TOPIC = "topic_control";
+
+ /** Holds the routing key for the queue to receive reports on. */
+ public static final String RESPONSE_QUEUE = "response";
+
+ /** Holds the JMS Topic to send test messages on. */
+ private final Topic _topic;
+
+ /** Holds the JMS Queue to receive reports on. */
+ private final Queue _response;
+
+ /** Holds the number of messages to send in each test run. */
+ private int numMessages;
+
+ /** A monitor used to wait for all reports to arrive back from consumers on. */
+ private CountDownLatch allReportsReceivedEvt;
+
+ /** Holds the connection to listen on. */
+ private Connection _connection;
+
+ /** Holds the channel for all test messages.*/
+ private Session _session;
+
+ /** Holds the producer to send test messages on. */
+ private MessageProducer publisher;
+
+ /**
+ * Creates a topic publisher that will send the specifed number of messages and expect the specifed number of report back from test
+ * subscribers.
+ *
+ * @param connectionUri The broker URL.
+ * @param numMessages The number of messages to send in each test.
+ * @param numSubscribers The number of subscribes that are expected to reply with a report.
+ */
+ Publisher(String connectionUri, int numMessages, int numSubscribers)
+ throws AMQException, JMSException, URLSyntaxException
+ {
+ log.debug("Publisher(String connectionUri = " + connectionUri + ", int numMessages = " + numMessages
+ + ", int numSubscribers = " + numSubscribers + "): called");
+
+ // Create a connection to the broker.
+ _connection = new AMQConnection(connectionUri);
+
+ // Establish a session on the broker.
+ _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ // Set up the destinations to send test messages and listen for reports on.
+ _topic = _session.createTopic(CONTROL_TOPIC);
+ _response = _session.createQueue(RESPONSE_QUEUE);
+
+ // Set this listener up to listen for reports on the response queue.
+ _session.createConsumer(_response).setMessageListener(this);
+
+ // Set up this listener with a producer to send the test messages and report requests on.
+ publisher = _session.createProducer(_topic);
+
+ // Keep the test parameters.
+ this.numMessages = numMessages;
+
+ // Set up a countdown to count all subscribers sending their reports.
+ allReportsReceivedEvt = new CountDownLatch(numSubscribers);
+
+ _connection.start();
+ System.out.println("Sending messages and waiting for reports...");
+ }
+
+ /**
+ * Start a test publisher. The broker URL must be specified as the first command line argument.
+ *
+ * @param argv The command line arguments, ignored.
+ *
+ * @todo Add command line arguments to configure all aspects of the test.
+ */
+ public static void main(String[] argv)
+ {
+ try
+ {
+ // Create an instance of this publisher with the command line parameters.
+ Publisher publisher = new Publisher(DEFAULT_URI, 1, 1);
+
+ // Publish the test messages.
+ publisher.doTest();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * Sends the test messages and waits for all subscribers to reply with a report.
+ *
+ * @throws JMSException Any underlying JMSException is allowed to fall through.
+ */
+ public void doTest() throws JMSException
+ {
+ log.debug("public void DoTest(): called");
+
+ // Create a test message to send.
+ Message testMessage = _session.createTextMessage("test");
+
+ // Send the desired number of test messages.
+ for (int i = 0; i < numMessages; i++)
+ {
+ publisher.send(testMessage);
+ }
+
+ log.debug("Sent " + numMessages + " test messages.");
+
+ // Send the report request.
+ Message reportRequestMessage = _session.createTextMessage("Report request message.");
+ reportRequestMessage.setStringProperty("TYPE", "REPORT_REQUEST");
+
+ reportRequestMessage.setBooleanProperty("BOOLEAN", false);
+ //reportRequestMessage.Headers.SetByte("BYTE", 5);
+ reportRequestMessage.setDoubleProperty("DOUBLE", 3.141);
+ reportRequestMessage.setFloatProperty("FLOAT", 1.0f);
+ reportRequestMessage.setIntProperty("INT", 1);
+ reportRequestMessage.setLongProperty("LONG", 1);
+ reportRequestMessage.setStringProperty("STRING", "hello");
+ reportRequestMessage.setShortProperty("SHORT", (short) 2);
+
+ publisher.send(reportRequestMessage);
+
+ log.debug("Sent the report request message, waiting for all replies...");
+
+ // Wait until all the reports come in.
+ try
+ {
+ allReportsReceivedEvt.await(TIMEOUT, TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException e)
+ { }
+
+ // Check if all reports were really received or if the timeout occurred.
+ if (allReportsReceivedEvt.getCount() == 0)
+ {
+ log.debug("Got all reports.");
+ }
+ else
+ {
+ log.debug("Waiting for reports timed out, still waiting for " + allReportsReceivedEvt.getCount() + ".");
+ }
+
+ // Send the termination request.
+ Message terminationRequestMessage = _session.createTextMessage("Termination request message.");
+ terminationRequestMessage.setStringProperty("TYPE", "TERMINATION_REQUEST");
+ publisher.send(terminationRequestMessage);
+
+ log.debug("Sent the termination request message.");
+
+ // Close all message producers and consumers and the connection to the broker.
+ shutdown();
+ }
+
+ /**
+ * Handles all report messages from subscribers. This decrements the count of subscribers that are still to reply, until this becomes
+ * zero, at which time waiting threads are notified of this event.
+ *
+ * @param message The received report message.
+ */
+ public void onMessage(Message message)
+ {
+ log.debug("public void OnMessage(Message message = " + message + "): called");
+
+ // Decrement the count of expected messages and release the wait monitor when this becomes zero.
+ allReportsReceivedEvt.countDown();
+
+ if (allReportsReceivedEvt.getCount() == 0)
+ {
+ log.debug("Got reports from all subscribers.");
+ }
+ }
+
+ /**
+ * Stops the message consumers and closes the connection.
+ *
+ * @throws JMSException Any underlying JMSException is allowed to fall through.
+ */
+ private void shutdown() throws JMSException
+ {
+ _session.close();
+ _connection.close();
+ }
+}
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/testcases/CircuitTestCase.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/testcases/CircuitTestCase.java
deleted file mode 100644
index 966a545e16..0000000000
--- a/java/integrationtests/src/main/java/org/apache/qpid/interop/testcases/CircuitTestCase.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.interop.testcases;
-
-import org.apache.log4j.Logger;
-
-import org.apache.qpid.test.framework.sequencers.TestCaseSequencer;
-import org.apache.qpid.test.framework.Circuit;
-import org.apache.qpid.test.framework.FrameworkBaseCase;
-import org.apache.qpid.test.framework.MessagingTestConfigProperties;
-
-import uk.co.thebadgerset.junit.extensions.util.ParsedProperties;
-import uk.co.thebadgerset.junit.extensions.util.TestContextProperties;
-
-/**
- * CircuitTestCase runs a test over a {@link Circuit} controlled by the test parameters.
- *
- * <p/><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * <tr><td>
- * </table>
- *
- * @todo When working with test context properties, add overrides to defaults to the singleton instance, but when taking
- * a starting point to add specific test case parameters to, take a copy. Use the copy with test case specifics
- * to control the test.
- */
-public class CircuitTestCase extends FrameworkBaseCase
-{
- /** Used for debugging. */
- private static final Logger log = Logger.getLogger(CircuitTestCase.class);
-
- /**
- * Creates a new test case with the specified name.
- *
- * @param name The test case name.
- */
- public CircuitTestCase(String name)
- {
- super(name);
- }
-
- /**
- * Performs the a basic P2P test case.
- *
- * @throws Exception Any exceptions are allowed to fall through and fail the test.
- */
- public void testBasicP2P() throws Exception
- {
- log.debug("public void testBasicP2P(): called");
-
- // Get the test parameters, any overrides on the command line will have been applied.
- ParsedProperties testProps = TestContextProperties.getInstance(MessagingTestConfigProperties.defaults);
-
- // Customize the test parameters.
- testProps.setProperty("TEST_NAME", "DEFAULT_CIRCUIT_TEST");
- testProps.setProperty(MessagingTestConfigProperties.SEND_DESTINATION_NAME_ROOT_PROPNAME, "testqueue");
-
- // Get the test sequencer to create test circuits and run the standard test procedure through.
- TestCaseSequencer sequencer = getTestSequencer();
-
- // Send the test messages, and check that there were no errors doing so.
- Circuit testCircuit = sequencer.createCircuit(testProps);
- sequencer.sequenceTest(testCircuit, assertionList(testCircuit.getPublisher().noExceptionsAssertion()), testProps);
-
- // Check that all of the message were sent.
- // Check that the receiving end got the same number of messages as the publishing end.
- }
-
- /**
- * Should provide a translation from the junit method name of a test to its test case name as known to the test
- * clients that will run the test. The purpose of this is to convert the JUnit method name into the correct test
- * case name to place into the test invite. For example the method "testP2P" might map onto the interop test case
- * name "TC2_BasicP2P".
- *
- * @param methodName The name of the JUnit test method.
- *
- * @return The name of the corresponding interop test case.
- */
- public String getTestCaseNameForTestMethod(String methodName)
- {
- return "DEFAULT_CIRCUIT_TEST";
- }
-}
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/test/framework/distributedtesting/InteropClientTestCase.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/InteropClientTestCase.java
index 5e6d61a9e0..37952d08c8 100644
--- a/java/integrationtests/src/main/java/org/apache/qpid/test/framework/distributedtesting/InteropClientTestCase.java
+++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/InteropClientTestCase.java
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpid.test.framework.distributedtesting;
+package org.apache.qpid.interop.testclient;
import javax.jms.JMSException;
import javax.jms.Message;
@@ -44,11 +44,7 @@ public interface InteropClientTestCase extends MessageListener
/** Defines the possible test case roles that an interop test case can take on. */
public enum Roles
{
- /** Specifies the sender role. */
- SENDER,
-
- /** Specifies the receivers role. */
- RECEIVER
+ SENDER, RECEIVER;
}
/**
@@ -74,7 +70,7 @@ public interface InteropClientTestCase extends MessageListener
* Assigns the role to be played by this test case. The test parameters are fully specified in the
* assignment message. When this method return the test case will be ready to execute.
*
- * @param role The role to be played; sender or receivers.
+ * @param role The role to be played; sender or receiver.
* @param assignRoleMessage The role assingment message, contains the full test parameters.
*
* @throws JMSException Any JMSException resulting from reading the message are allowed to fall through.
@@ -82,13 +78,20 @@ public interface InteropClientTestCase extends MessageListener
public void assignRole(Roles role, Message assignRoleMessage) throws JMSException;
/**
- * Performs the test case actions. Returning from here, indicates that the sending role has completed its test.
- *
+ * Performs the test case actions.
+ * return from here when you have finished the test.. this will signal the controller that the test has ended.
* @throws JMSException Any JMSException resulting from reading the message are allowed to fall through.
*/
public void start() throws JMSException;
/**
+ * Gives notice of termination of the test case actions.
+ *
+ * @throws JMSException Any JMSException resulting from allowed to fall through.
+ */
+ public void terminate() throws JMSException, InterruptedException;
+
+ /**
* Gets a report on the actions performed by the test case in its assigned role.
*
* @param session The session to create the report message in.
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/test/framework/distributedtesting/TestClient.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java
index 12c0d0aa69..a904bfa419 100644
--- a/java/integrationtests/src/main/java/org/apache/qpid/test/framework/distributedtesting/TestClient.java
+++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java
@@ -18,41 +18,45 @@
* under the License.
*
*/
-package org.apache.qpid.test.framework.distributedtesting;
+package org.apache.qpid.interop.testclient;
import org.apache.log4j.Logger;
-
-import org.apache.qpid.interop.clienttestcases.TestCase1DummyRun;
-import org.apache.qpid.interop.clienttestcases.TestCase2BasicP2P;
-import org.apache.qpid.interop.clienttestcases.TestCase3BasicPubSub;
-import org.apache.qpid.sustained.SustainedClientTestCase;
-import org.apache.qpid.test.framework.MessagingTestConfigProperties;
-import org.apache.qpid.test.framework.TestUtils;
-
-import uk.co.thebadgerset.junit.extensions.util.ParsedProperties;
-import uk.co.thebadgerset.junit.extensions.util.TestContextProperties;
-
-import javax.jms.*;
-
+import org.apache.qpid.interop.testclient.testcases.TestCase1DummyRun;
+import org.apache.qpid.interop.testclient.testcases.TestCase2BasicP2P;
+import org.apache.qpid.util.CommandLineParser;
+import org.apache.qpid.util.PropertiesUtils;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.Properties;
/**
* Implements a test client as described in the interop testing spec
* (http://cwiki.apache.org/confluence/display/qpid/Interop+Testing+Specification). A test client is an agent that
- * reacts to control message sequences send by the test {@link org.apache.qpid.test.framework.distributedtesting.Coordinator}.
+ * reacts to control message sequences send by the test {@link org.apache.qpid.interop.coordinator.Coordinator}.
*
- * <p/><table><caption>Messages Handled by SustainedTestClient</caption>
+ * <p/><table><caption>Messages Handled by TestClient</caption>
* <tr><th> Message <th> Action
* <tr><td> Invite(compulsory) <td> Reply with Enlist.
* <tr><td> Invite(test case) <td> Reply with Enlist if test case available.
* <tr><td> AssignRole(test case) <td> Reply with Accept Role if matches an enlisted test. Keep test parameters.
* <tr><td> Start <td> Send test messages defined by test parameters. Send report on messages sent.
* <tr><td> Status Request <td> Send report on messages received.
- * <tr><td> Terminate <td> Terminate the test client.
* </table>
*
* <p><table id="crc"><caption>CRC Card</caption>
@@ -63,14 +67,12 @@ import java.util.Map;
*/
public class TestClient implements MessageListener
{
- /** Used for debugging. */
- private static final Logger log = Logger.getLogger(TestClient.class);
+ private static Logger log = Logger.getLogger(TestClient.class);
- /** Used for reporting to the console. */
- private static final Logger console = Logger.getLogger("CONSOLE");
-
- /** Holds the default identifying name of the test client. */
+ public static final String CONNECTION_PROPERTY = "connectionfactory.broker";
+ public static final String CONNECTION_NAME = "broker";
public static final String CLIENT_NAME = "java";
+ public static final String DEFAULT_CONNECTION_PROPS_RESOURCE = "org/apache/qpid/interop/connection.properties";
/** Holds the URL of the broker to run the tests on. */
public static String brokerUrl;
@@ -78,34 +80,17 @@ public class TestClient implements MessageListener
/** Holds the virtual host to run the tests on. If <tt>null</tt>, then the default virtual host is used. */
public static String virtualHost;
- /**
- * Holds the test context properties that provides the default test parameters, plus command line overrides.
- * This is initialized with the default test parameters, to which command line overrides may be applied.
- */
- public static ParsedProperties testContextProperties =
- TestContextProperties.getInstance(MessagingTestConfigProperties.defaults);
-
/** Holds all the test cases loaded from the classpath. */
Map<String, InteropClientTestCase> testCases = new HashMap<String, InteropClientTestCase>();
- /** Holds the test case currently being run by this client. */
protected InteropClientTestCase currentTestCase;
- /** Holds the connection to the broker that the test is being coordinated on. */
- protected Connection connection;
-
- /** Holds the message producer to hold the test coordination over. */
+ protected Connection _connection;
protected MessageProducer producer;
-
- /** Holds the JMS session for the test coordination. */
protected Session session;
- /** Holds the name of this client, with a default value. */
protected String clientName = CLIENT_NAME;
- /** This flag indicates that the test client should attempt to join the currently running test case on start up. */
- protected boolean join;
-
/**
* Creates a new interop test client, listenting to the specified broker and virtual host, with the specified client
* identifying name.
@@ -114,16 +99,15 @@ public class TestClient implements MessageListener
* @param virtualHost The virtual host to conect to.
* @param clientName The client name to use.
*/
- public TestClient(String brokerUrl, String virtualHost, String clientName, boolean join)
+ public TestClient(String brokerUrl, String virtualHost, String clientName)
{
- log.debug("public SustainedTestClient(String brokerUrl = " + brokerUrl + ", String virtualHost = " + virtualHost
- + ", String clientName = " + clientName + "): called");
+ log.debug("public TestClient(String brokerUrl = " + brokerUrl + ", String virtualHost = " + virtualHost
+ + ", String clientName = " + clientName + "): called");
// Retain the connection parameters.
this.brokerUrl = brokerUrl;
this.virtualHost = virtualHost;
this.clientName = clientName;
- this.join = join;
}
/**
@@ -140,41 +124,49 @@ public class TestClient implements MessageListener
*/
public static void main(String[] args)
{
- // Override the default broker url to be localhost:5672.
- testContextProperties.setProperty(MessagingTestConfigProperties.BROKER_PROPNAME, "tcp://localhost:5672");
-
- // Use the command line parser to evaluate the command line with standard handling behaviour (print errors
- // and usage then exist if there are errors).
- // Any options and trailing name=value pairs are also injected into the test context properties object,
- // to override any defaults that may have been set up.
- ParsedProperties options =
- new ParsedProperties(uk.co.thebadgerset.junit.extensions.util.CommandLineParser.processCommandLine(args,
- new uk.co.thebadgerset.junit.extensions.util.CommandLineParser(
+ // Use the command line parser to evaluate the command line.
+ CommandLineParser commandLine =
+ new CommandLineParser(
new String[][]
- {
- { "b", "The broker URL.", "broker", "false" },
- { "h", "The virtual host to use.", "virtual host", "false" },
- { "o", "The name of the directory to output test timings to.", "dir", "false" },
- { "n", "The name of the test client.", "name", "false" },
- { "j", "Join this test client to running test.", "false" }
- }), testContextProperties));
+ {
+ {"b", "The broker URL.", "broker", "false"},
+ {"h", "The virtual host to use.", "virtual host", "false"},
+ {"n", "The test client name.", "name", "false"}
+ });
+
+ // Capture the command line arguments or display errors and correct usage and then exit.
+ Properties options = null;
+
+ try
+ {
+ options = commandLine.parseCommandLine(args);
+ }
+ catch (IllegalArgumentException e)
+ {
+ System.out.println(commandLine.getErrors());
+ System.out.println(commandLine.getUsage());
+ System.exit(1);
+ }
// Extract the command line options.
String brokerUrl = options.getProperty("b");
String virtualHost = options.getProperty("h");
String clientName = options.getProperty("n");
- boolean join = options.getPropertyAsBoolean("j");
+
+ // Add all the trailing command line options (name=value pairs) to system properties. Tests may pick up
+ // overridden values from there.
+ commandLine.addCommandLineToSysProperties();
// Create a test client and start it running.
- TestClient client = new TestClient(brokerUrl, virtualHost, (clientName == null) ? CLIENT_NAME : clientName, join);
+ TestClient client = new TestClient(brokerUrl, virtualHost, (clientName == null) ? CLIENT_NAME : clientName);
// Use a class path scanner to find all the interop test case implementations.
- // Hard code the test classes till the classpath scanner is fixed.
Collection<Class<? extends InteropClientTestCase>> testCaseClasses =
- new ArrayList<Class<? extends InteropClientTestCase>>();
+ new ArrayList<Class<? extends InteropClientTestCase>>();
// ClasspathScanner.getMatches(InteropClientTestCase.class, "^TestCase.*", true);
- Collections.addAll(testCaseClasses, TestCase1DummyRun.class, TestCase2BasicP2P.class, TestCase3BasicPubSub.class,
- SustainedClientTestCase.class);
+ // Hard code the test classes till the classpath scanner is fixed.
+ Collections.addAll(testCaseClasses,
+ new Class[]{TestCase1DummyRun.class, TestCase2BasicP2P.class, TestClient.class});
try
{
@@ -190,10 +182,7 @@ public class TestClient implements MessageListener
/**
* Starts the interop test client running. This causes it to start listening for incoming test invites.
*
- * @param testCaseClasses The classes of the available test cases. The test case names from these are used to
- * matchin incoming test invites against.
- *
- * @throws JMSException Any underlying JMSExceptions are allowed to fall through.
+ * @throws JMSException Any underlying JMSExceptions are allowed to fall through. @param testCaseClasses
*/
protected void start(Collection<Class<? extends InteropClientTestCase>> testCaseClasses) throws JMSException
{
@@ -220,36 +209,84 @@ public class TestClient implements MessageListener
}
// Open a connection to communicate with the coordinator on.
- connection = TestUtils.createConnection(testContextProperties);
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ _connection = createConnection(DEFAULT_CONNECTION_PROPS_RESOURCE, clientName, brokerUrl, virtualHost);
+
+ session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Set this up to listen for control messages.
- Topic privateControlTopic = session.createTopic("iop.control." + clientName);
- MessageConsumer consumer = session.createConsumer(privateControlTopic);
+ MessageConsumer consumer = session.createConsumer(session.createTopic("iop.control." + clientName));
consumer.setMessageListener(this);
- Topic controlTopic = session.createTopic("iop.control");
- MessageConsumer consumer2 = session.createConsumer(controlTopic);
+ MessageConsumer consumer2 = session.createConsumer(session.createTopic("iop.control"));
consumer2.setMessageListener(this);
// Create a producer to send replies with.
producer = session.createProducer(null);
- // If the join flag was set, then broadcast a join message to notify the coordinator that a new test client
- // is available to join the current test case, if it supports it. This message may be ignored, or it may result
- // in this test client receiving a test invite.
- if (join)
+ // Start listening for incoming control messages.
+ _connection.start();
+ }
+
+
+ public static Connection createConnection(String connectionPropsResource, String brokerUrl, String virtualHost)
+ {
+ return createConnection(connectionPropsResource, "clientID", brokerUrl, virtualHost);
+ }
+
+ /**
+ * Establishes a JMS connection using a properties file and qpids built in JNDI implementation. This is a simple
+ * convenience method for code that does anticipate handling connection failures. All exceptions that indicate that
+ * the connection has failed, are wrapped as rutime exceptions, preumably handled by a top level failure handler.
+ *
+ * @param connectionPropsResource The name of the connection properties file.
+ * @param clientID
+ * @param brokerUrl The broker url to connect to, <tt>null</tt> to use the default from the
+ * properties.
+ * @param virtualHost The virtual host to connectio to, <tt>null</tt> to use the default.
+ *
+ * @return A JMS conneciton.
+ *
+ * @todo Make username/password configurable. Allow multiple urls for fail over. Once it feels right, move it to a
+ * Utils library class.
+ */
+ public static Connection createConnection(String connectionPropsResource, String clientID, String brokerUrl, String virtualHost)
+ {
+ log.debug("public static Connection createConnection(String connectionPropsResource = " + connectionPropsResource
+ + ", String brokerUrl = " + brokerUrl + ", String clientID = " + clientID
+ + ", String virtualHost = " + virtualHost + " ): called");
+
+ try
{
- Message joinMessage = session.createMessage();
+ Properties connectionProps =
+ PropertiesUtils.getProperties(TestClient.class.getClassLoader().getResourceAsStream(
+ connectionPropsResource));
- joinMessage.setStringProperty("CONTROL_TYPE", "JOIN");
- joinMessage.setStringProperty("CLIENT_NAME", clientName);
- joinMessage.setStringProperty("CLIENT_PRIVATE_CONTROL_KEY", "iop.control." + clientName);
- producer.send(controlTopic, joinMessage);
- }
+ if (brokerUrl != null)
+ {
+ String connectionString =
+ "amqp://guest:guest@" + clientID + "/" + ((virtualHost != null) ? virtualHost : "") + "?brokerlist='" + brokerUrl + "'";
+ connectionProps.setProperty(CONNECTION_PROPERTY, connectionString);
+ }
- // Start listening for incoming control messages.
- connection.start();
+ Context ctx = new InitialContext(connectionProps);
+
+ ConnectionFactory cf = (ConnectionFactory) ctx.lookup(CONNECTION_NAME);
+ Connection connection = cf.createConnection();
+
+ return connection;
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (NamingException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (JMSException e)
+ {
+ throw new RuntimeException(e);
+ }
}
/**
@@ -290,7 +327,7 @@ public class TestClient implements MessageListener
}
else
{
- log.debug("Received an invite to the test '" + testName + "' but this test is not known.");
+ log.warn("'" + testName + "' not part of this clients tests.");
}
}
else
@@ -357,8 +394,16 @@ public class TestClient implements MessageListener
{
log.info("Received termination instruction from coordinator.");
+// try
+// {
+// currentTestCase.terminate();
+// }
+// catch (InterruptedException e)
+// {
+// //
+// }
// Is a cleaner shutdown needed?
- connection.close();
+ _connection.close();
System.exit(0);
}
else
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/clienttestcases/TestCase1DummyRun.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase1DummyRun.java
index b119d13a3d..5f257c0b36 100644
--- a/java/integrationtests/src/main/java/org/apache/qpid/interop/clienttestcases/TestCase1DummyRun.java
+++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase1DummyRun.java
@@ -18,11 +18,11 @@
* under the License.
*
*/
-package org.apache.qpid.interop.clienttestcases;
+package org.apache.qpid.interop.testclient.testcases;
import org.apache.log4j.Logger;
-import org.apache.qpid.test.framework.distributedtesting.InteropClientTestCase;
+import org.apache.qpid.interop.testclient.InteropClientTestCase;
import javax.jms.JMSException;
import javax.jms.Message;
@@ -43,15 +43,8 @@ import javax.jms.Session;
*/
public class TestCase1DummyRun implements InteropClientTestCase
{
- /** Used for debugging. */
private static final Logger log = Logger.getLogger(TestCase1DummyRun.class);
- /**
- * Should provide the name of the test case that this class implements. The exact names are defined in the
- * interop testing spec.
- *
- * @return The name of the test case that this implements.
- */
public String getName()
{
log.debug("public String getName(): called");
@@ -59,15 +52,6 @@ public class TestCase1DummyRun implements InteropClientTestCase
return "TC1_DummyRun";
}
- /**
- * Determines whether the test invite that matched this test case is acceptable.
- *
- * @param inviteMessage The invitation to accept or reject.
- *
- * @return <tt>true</tt> to accept the invitation, <tt>false</tt> to reject it.
- *
- * @throws JMSException Any JMSException resulting from reading the message are allowed to fall through.
- */
public boolean acceptInvite(Message inviteMessage) throws JMSException
{
log.debug("public boolean acceptInvite(Message inviteMessage): called");
@@ -76,15 +60,6 @@ public class TestCase1DummyRun implements InteropClientTestCase
return true;
}
- /**
- * Assigns the role to be played by this test case. The test parameters are fully specified in the
- * assignment message. When this method return the test case will be ready to execute.
- *
- * @param role The role to be played; sender or receivers.
- * @param assignRoleMessage The role assingment message, contains the full test parameters.
- *
- * @throws JMSException Any JMSException resulting from reading the message are allowed to fall through.
- */
public void assignRole(Roles role, Message assignRoleMessage) throws JMSException
{
log.debug("public void assignRole(Roles role, Message assignRoleMessage): called");
@@ -92,9 +67,6 @@ public class TestCase1DummyRun implements InteropClientTestCase
// Do nothing, both roles are the same.
}
- /**
- * Performs the test case actions. Returning from here, indicates that the sending role has completed its test.
- */
public void start()
{
log.debug("public void start(): called");
@@ -102,15 +74,11 @@ public class TestCase1DummyRun implements InteropClientTestCase
// Do nothing.
}
- /**
- * Gets a report on the actions performed by the test case in its assigned role.
- *
- * @param session The session to create the report message in.
- *
- * @return The report message.
- *
- * @throws JMSException Any JMSExceptions resulting from creating the report are allowed to fall through.
- */
+ public void terminate() throws JMSException
+ {
+ //todo
+ }
+
public Message getReport(Session session) throws JMSException
{
log.debug("public Message getReport(Session session): called");
@@ -119,11 +87,6 @@ public class TestCase1DummyRun implements InteropClientTestCase
return session.createTextMessage("Dummy Run, Ok.");
}
- /**
- * Handles incoming test messages. Does nothing.
- *
- * @param message The incoming test message.
- */
public void onMessage(Message message)
{
log.debug("public void onMessage(Message message = " + message + "): called");
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/clienttestcases/TestCase2BasicP2P.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase2BasicP2P.java
index 080bd846ee..ff56ee9b93 100644
--- a/java/integrationtests/src/main/java/org/apache/qpid/interop/clienttestcases/TestCase2BasicP2P.java
+++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase2BasicP2P.java
@@ -18,15 +18,14 @@
* under the License.
*
*/
-package org.apache.qpid.interop.clienttestcases;
+package org.apache.qpid.interop.testclient.testcases;
+
+import javax.jms.*;
import org.apache.log4j.Logger;
-import org.apache.qpid.test.framework.distributedtesting.InteropClientTestCase;
-import org.apache.qpid.test.framework.distributedtesting.TestClient;
-import org.apache.qpid.test.framework.TestUtils;
-
-import javax.jms.*;
+import org.apache.qpid.interop.testclient.InteropClientTestCase;
+import org.apache.qpid.interop.testclient.TestClient;
/**
* Implements test case 2, basic P2P. Sends/received a specified number of messages to a specified route on the
@@ -55,6 +54,9 @@ public class TestCase2BasicP2P implements InteropClientTestCase
/** The number of test messages to send. */
private int numMessages;
+ /** The routing key to send them to on the default direct exchange. */
+ private Destination sendDestination;
+
/** The connection to send the test messages on. */
private Connection connection;
@@ -98,7 +100,7 @@ public class TestCase2BasicP2P implements InteropClientTestCase
* Assigns the role to be played by this test case. The test parameters are fully specified in the
* assignment message. When this method return the test case will be ready to execute.
*
- * @param role The role to be played; sender or receivers.
+ * @param role The role to be played; sender or receiver.
*
* @param assignRoleMessage The role assingment message, contains the full test parameters.
*
@@ -116,12 +118,14 @@ public class TestCase2BasicP2P implements InteropClientTestCase
this.role = role;
// Create a new connection to pass the test messages on.
- connection = TestUtils.createConnection(TestClient.testContextProperties);
+ connection =
+ TestClient.createConnection(TestClient.DEFAULT_CONNECTION_PROPS_RESOURCE, TestClient.brokerUrl,
+ TestClient.virtualHost);
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Extract and retain the test parameters.
numMessages = assignRoleMessage.getIntProperty("P2P_NUM_MESSAGES");
- Destination sendDestination = session.createQueue(assignRoleMessage.getStringProperty("P2P_QUEUE_AND_KEY_NAME"));
+ sendDestination = session.createQueue(assignRoleMessage.getStringProperty("P2P_QUEUE_AND_KEY_NAME"));
log.debug("numMessages = " + numMessages);
log.debug("sendDestination = " + sendDestination);
@@ -134,7 +138,7 @@ public class TestCase2BasicP2P implements InteropClientTestCase
producer = session.createProducer(sendDestination);
break;
- // Otherwise the receivers role is being assigned, so set this up to listen for messages.
+ // Otherwise the receiver role is being assigned, so set this up to listen for messages.
case RECEIVER:
MessageConsumer consumer = session.createConsumer(sendDestination);
consumer.setMessageListener(this);
@@ -145,9 +149,7 @@ public class TestCase2BasicP2P implements InteropClientTestCase
}
/**
- * Performs the test case actions. Returning from here, indicates that the sending role has completed its test.
- *
- * @throws JMSException Any JMSException resulting from reading the message are allowed to fall through.
+ * Performs the test case actions.
*/
public void start() throws JMSException
{
@@ -168,6 +170,11 @@ public class TestCase2BasicP2P implements InteropClientTestCase
}
}
+ public void terminate() throws JMSException
+ {
+ //todo
+ }
+
/**
* Gets a report on the actions performed by the test case in its assigned role.
*
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/clienttestcases/TestCase3BasicPubSub.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase3BasicPubSub.java
index a11d045e89..7b35142c82 100644
--- a/java/integrationtests/src/main/java/org/apache/qpid/interop/clienttestcases/TestCase3BasicPubSub.java
+++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase3BasicPubSub.java
@@ -18,19 +18,18 @@
* under the License.
*
*/
-package org.apache.qpid.interop.clienttestcases;
-import org.apache.log4j.Logger;
-
-import org.apache.qpid.test.framework.distributedtesting.InteropClientTestCase;
-import org.apache.qpid.test.framework.distributedtesting.TestClient;
-import org.apache.qpid.test.framework.TestUtils;
+package org.apache.qpid.interop.testclient.testcases;
import javax.jms.*;
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.interop.testclient.InteropClientTestCase;
+
/**
* Implements test case 3, basic pub/sub. Sends/received a specified number of messages to a specified route on the
- * default topic exchange, using the specified number of receivers connections. Produces reports on the actual number of
+ * default topic exchange, using the specified number of receiver connections. Produces reports on the actual number of
* messages sent/received.
*
* <p><table id="crc"><caption>CRC Card</caption>
@@ -56,6 +55,12 @@ public class TestCase3BasicPubSub implements InteropClientTestCase
/** The number of test messages to send. */
private int numMessages;
+ /** The number of receiver connection to use. */
+ private int numReceivers;
+
+ /** The routing key to send them to on the default direct exchange. */
+ private Destination sendDestination;
+
/** The connections to send/receive the test messages on. */
private Connection[] connection;
@@ -99,7 +104,7 @@ public class TestCase3BasicPubSub implements InteropClientTestCase
* Assigns the role to be played by this test case. The test parameters are fully specified in the
* assignment message. When this method return the test case will be ready to execute.
*
- * @param role The role to be played; sender or receivers.
+ * @param role The role to be played; sender or receiver.
*
* @param assignRoleMessage The role assingment message, contains the full test parameters.
*
@@ -118,7 +123,7 @@ public class TestCase3BasicPubSub implements InteropClientTestCase
// Extract and retain the test parameters.
numMessages = assignRoleMessage.getIntProperty("PUBSUB_NUM_MESSAGES");
- int numReceivers = assignRoleMessage.getIntProperty("PUBSUB_NUM_RECEIVERS");
+ numReceivers = assignRoleMessage.getIntProperty("PUBSUB_NUM_RECEIVERS");
String sendKey = assignRoleMessage.getStringProperty("PUBSUB_KEY");
log.debug("numMessages = " + numMessages);
@@ -134,25 +139,29 @@ public class TestCase3BasicPubSub implements InteropClientTestCase
connection = new Connection[1];
session = new Session[1];
- connection[0] = TestUtils.createConnection(TestClient.testContextProperties);
+ connection[0] =
+ org.apache.qpid.interop.testclient.TestClient.createConnection(org.apache.qpid.interop.testclient.TestClient.DEFAULT_CONNECTION_PROPS_RESOURCE, org.apache.qpid.interop.testclient.TestClient.brokerUrl,
+ org.apache.qpid.interop.testclient.TestClient.virtualHost);
session[0] = connection[0].createSession(false, Session.AUTO_ACKNOWLEDGE);
// Extract and retain the test parameters.
- Destination sendDestination = session[0].createTopic(sendKey);
+ sendDestination = session[0].createTopic(sendKey);
producer = session[0].createProducer(sendDestination);
break;
- // Otherwise the receivers role is being assigned, so set this up to listen for messages on the required number
- // of receivers connections.
+ // Otherwise the receiver role is being assigned, so set this up to listen for messages on the required number
+ // of receiver connections.
case RECEIVER:
- // Create the required number of receivers connections.
+ // Create the required number of receiver connections.
connection = new Connection[numReceivers];
session = new Session[numReceivers];
for (int i = 0; i < numReceivers; i++)
{
- connection[i] = TestUtils.createConnection(TestClient.testContextProperties);
+ connection[i] =
+ org.apache.qpid.interop.testclient.TestClient.createConnection(org.apache.qpid.interop.testclient.TestClient.DEFAULT_CONNECTION_PROPS_RESOURCE, org.apache.qpid.interop.testclient.TestClient.brokerUrl,
+ org.apache.qpid.interop.testclient.TestClient.virtualHost);
session[i] = connection[i].createSession(false, Session.AUTO_ACKNOWLEDGE);
sendDestination = session[i].createTopic(sendKey);
@@ -165,16 +174,14 @@ public class TestCase3BasicPubSub implements InteropClientTestCase
}
// Start all the connection dispatcher threads running.
- for (Connection conn : connection)
+ for (int i = 0; i < connection.length; i++)
{
- conn.start();
+ connection[i].start();
}
}
/**
- * Performs the test case actions. Returning from here, indicates that the sending role has completed its test.
- *
- * @throws JMSException Any JMSException resulting from reading the message are allowed to fall through.
+ * Performs the test case actions.
*/
public void start() throws JMSException
{
@@ -195,6 +202,11 @@ public class TestCase3BasicPubSub implements InteropClientTestCase
}
}
+ public void terminate() throws JMSException, InterruptedException
+ {
+ //todo
+ }
+
/**
* Gets a report on the actions performed by the test case in its assigned role.
*
@@ -209,9 +221,9 @@ public class TestCase3BasicPubSub implements InteropClientTestCase
log.debug("public Message getReport(Session session): called");
// Close the test connections.
- for (Connection conn : connection)
+ for (int i = 0; i < connection.length; i++)
{
- conn.close();
+ connection[i].close();
}
// Generate a report message containing the count of the number of messages passed.
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedClientTestCase.java b/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedClientTestCase.java
deleted file mode 100644
index 65e05fab4b..0000000000
--- a/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedClientTestCase.java
+++ /dev/null
@@ -1,905 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.sustained;
-
-import org.apache.log4j.Logger;
-
-import org.apache.qpid.client.AMQNoConsumersException;
-import org.apache.qpid.client.AMQNoRouteException;
-import org.apache.qpid.test.framework.distributedtesting.TestClient;
-import org.apache.qpid.interop.clienttestcases.TestCase3BasicPubSub;
-import org.apache.qpid.test.framework.TestUtils;
-
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.ExceptionListener;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-
-/**
- * Implements test case 3, basic pub/sub. Sends/received a specified number of messages to a specified route on the
- * default topic exchange, using the specified number of receivers connections. Produces reports on the actual number of
- * messages sent/received.
- *
- * <p><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Supply the name of the test case that this implements.
- * <tr><td> Accept/Reject invites based on test parameters.
- * <tr><td> Adapt to assigned roles.
- * <tr><td> Send required number of test messages using pub/sub. <tr><td> Generate test reports.
- * </table>
- */
-public class SustainedClientTestCase extends TestCase3BasicPubSub implements ExceptionListener
-{
- /** Used for debugging. */
- private static final Logger log = Logger.getLogger(SustainedClientTestCase.class);
-
- /** Used to log to the console. */
- private static final Logger console = Logger.getLogger("SustainedTest");
-
- /** The role to be played by the test. */
- private Roles role;
-
- /** The number of receivers connection to use. */
- private int numReceivers;
-
- /** The routing key to send them to on the default direct exchange. */
- private Destination sendDestination;
-
- /** The routing key to send updates to on the default direct exchange. */
- private Destination sendUpdateDestination;
-
- /** The connections to send/receive the test messages on. */
- private Connection[] connection;
-
- /** The sessions to send/receive the test messages on. */
- private Session[] session;
-
- /** The producer to send the test messages with. */
- MessageProducer producer;
-
- /** Adapter that adjusts the send rate based on the updates from clients. */
- SustainedRateAdapter _rateAdapter;
-
- /** */
- int _batchSize;
-
- private static final long TEN_MILLI_SEC = 10000000;
- private static final int DEBUG_LOG_UPATE_INTERVAL = 10;
- private static final int LOG_UPATE_INTERVAL = 10;
- private static final boolean SLEEP_PER_MESSAGE = Boolean.getBoolean("sleepPerMessage");
-
- /**
- * Should provide the name of the test case that this class implements. The exact names are defined in the interop
- * testing spec.
- *
- * @return The name of the test case that this implements.
- */
- public String getName()
- {
- log.debug("public String getName(): called");
-
- return "Perf_SustainedPubSub";
- }
-
- /**
- * Assigns the role to be played by this test case. The test parameters are fully specified in the assignment
- * message. When this method return the test case will be ready to execute.
- *
- * @param role The role to be played; sender or receivers.
- * @param assignRoleMessage The role assingment message, contains the full test parameters.
- *
- * @throws JMSException Any JMSException resulting from reading the message are allowed to fall through.
- */
- public void assignRole(Roles role, Message assignRoleMessage) throws JMSException
- {
- log.debug("public void assignRole(Roles role = " + role + ", Message assignRoleMessage = " + assignRoleMessage
- + "): called");
-
- // Take note of the role to be played.
- this.role = role;
-
- // Extract and retain the test parameters.
- numReceivers = assignRoleMessage.getIntProperty("SUSTAINED_NUM_RECEIVERS");
- _batchSize = assignRoleMessage.getIntProperty("SUSTAINED_UPDATE_INTERVAL");
- String sendKey = assignRoleMessage.getStringProperty("SUSTAINED_KEY");
- String sendUpdateKey = assignRoleMessage.getStringProperty("SUSTAINED_UPDATE_KEY");
- int ackMode = assignRoleMessage.getIntProperty("ACKNOWLEDGE_MODE");
- String clientName = assignRoleMessage.getStringProperty("CLIENT_NAME");
-
- if (log.isDebugEnabled())
- {
- log.debug("numReceivers = " + numReceivers);
- log.debug("_batchSize = " + _batchSize);
- log.debug("ackMode = " + ackMode);
- log.debug("sendKey = " + sendKey);
- log.debug("sendUpdateKey = " + sendUpdateKey);
- log.debug("role = " + role);
- }
-
- switch (role)
- {
- // Check if the sender role is being assigned, and set up a single message producer if so.
- case SENDER:
- console.info("Creating Sender");
- // Create a new connection to pass the test messages on.
- connection = new Connection[1];
- session = new Session[1];
-
- connection[0] = TestUtils.createConnection(TestClient.testContextProperties);
- session[0] = connection[0].createSession(false, ackMode);
-
- // Extract and retain the test parameters.
- sendDestination = session[0].createTopic(sendKey);
-
- connection[0].setExceptionListener(this);
-
- producer = session[0].createProducer(sendDestination);
-
- sendUpdateDestination = session[0].createTopic(sendUpdateKey);
- MessageConsumer updateConsumer = session[0].createConsumer(sendUpdateDestination);
-
- _rateAdapter = new SustainedRateAdapter(this);
- updateConsumer.setMessageListener(_rateAdapter);
-
- break;
-
- // Otherwise the receivers role is being assigned, so set this up to listen for messages on the required number
- // of receivers connections.
- case RECEIVER:
- console.info("Creating Receiver");
- // Create the required number of receivers connections.
- connection = new Connection[numReceivers];
- session = new Session[numReceivers];
-
- for (int i = 0; i < numReceivers; i++)
- {
- connection[i] = TestUtils.createConnection(TestClient.testContextProperties);
- session[i] = connection[i].createSession(false, ackMode);
-
- sendDestination = session[i].createTopic(sendKey);
-
- sendUpdateDestination = session[i].createTopic(sendUpdateKey);
-
- MessageConsumer consumer = session[i].createConsumer(sendDestination);
-
- consumer.setMessageListener(new SustainedListener(clientName + "-" + i, _batchSize, session[i],
- sendUpdateDestination));
- }
-
- break;
- }
-
- // Start all the connection dispatcher threads running.
- for (int i = 0; i < connection.length; i++)
- {
- connection[i].start();
- }
- }
-
- /** Performs the test case actions. */
- public void start() throws JMSException
- {
- log.debug("public void start(): called");
-
- // Check that the sender role is being performed.
- switch (role)
- {
- // Check if the sender role is being assigned, and set up a single message producer if so.
- case SENDER:
- _rateAdapter.run();
- break;
- case RECEIVER:
-
- }
-
- // return from here when you have finished the test.. this will signal the controller and
- }
-
- public void terminate() throws JMSException, InterruptedException
- {
- if (_rateAdapter != null)
- {
- _rateAdapter.stop();
- }
- }
-
- /**
- * Gets a report on the actions performed by the test case in its assigned role.
- *
- * @param session The session to create the report message in.
- *
- * @return The report message.
- *
- * @throws JMSException Any JMSExceptions resulting from creating the report are allowed to fall through.
- */
- public Message getReport(Session session) throws JMSException
- {
- log.debug("public Message getReport(Session session): called");
-
- // Close the test connections.
- for (int i = 0; i < connection.length; i++)
- {
- connection[i].close();
- }
-
- Message report = session.createMessage();
- report.setStringProperty("CONTROL_TYPE", "REPORT");
-
- return report;
- }
-
- public void onException(JMSException jmsException)
- {
- Exception linked = jmsException.getLinkedException();
-
- if (linked != null)
- {
- if (log.isDebugEnabled())
- {
- log.debug("Linked Exception:" + linked);
- }
-
- if ((linked instanceof AMQNoRouteException) || (linked instanceof AMQNoConsumersException))
- {
- if (log.isDebugEnabled())
- {
- if (linked instanceof AMQNoConsumersException)
- {
- log.warn("No clients currently available for message:"
- + ((AMQNoConsumersException) linked).getUndeliveredMessage());
- }
- else
- {
- log.warn("No route for message");
- }
- }
-
- // Tell the rate adapter that there are no clients ready yet
- _rateAdapter.NO_CLIENTS = true;
- }
- }
- else
- {
- log.warn("Exception:" + linked);
- }
- }
-
- /**
- * Inner class that listens for messages and sends a report for the time taken between receiving the 'start' and
- * 'end' messages.
- */
- class SustainedListener implements MessageListener
- {
- /** Number of messages received */
- private long _received = 0;
- /** The number of messages in the batch */
- private int _batchSize = 0;
- /** Record of the when the 'start' messagse was sen */
- private Long _startTime;
- /** Message producer to use to send reports */
- MessageProducer _updater;
- /** Session to create the report message on */
- Session _session;
- /** Record of the client ID used for this SustainedListnener */
- String _client;
-
- /**
- * Main Constructor
- *
- * @param clientname The _client id used to identify this connection.
- * @param batchSize The number of messages that are to be sent per batch. Note: This is not used to
- * control the interval between sending reports.
- * @param session The session used for communication.
- * @param sendDestination The destination that update reports should be sent to.
- *
- * @throws JMSException My occur if creatingthe Producer fails
- */
- public SustainedListener(String clientname, int batchSize, Session session, Destination sendDestination)
- throws JMSException
- {
- _batchSize = batchSize;
- _client = clientname;
- _session = session;
- _updater = session.createProducer(sendDestination);
- }
-
- public void onMessage(Message message)
- {
- if (log.isTraceEnabled())
- {
- log.trace("Message " + _received + "received in listener");
- }
-
- if (message instanceof TextMessage)
- {
- try
- {
- _received++;
- if (((TextMessage) message).getText().equals("start"))
- {
- log.debug("Starting Batch");
- _startTime = System.nanoTime();
- }
- else if (((TextMessage) message).getText().equals("end"))
- {
- if (_startTime != null)
- {
- long currentTime = System.nanoTime();
- sendStatus(currentTime - _startTime, _received, message.getIntProperty("BATCH"));
- log.debug("End Batch");
- }
- }
- }
- catch (JMSException e)
- {
- // ignore error
- }
- }
-
- }
-
- /**
- * sendStatus creates and sends the report back to the publisher
- *
- * @param time taken for the the last batch
- * @param received Total number of messages received.
- * @param batchNumber the batch number
- * @throws JMSException if an error occurs during the send
- */
- private void sendStatus(long time, long received, int batchNumber) throws JMSException
- {
- Message updateMessage = _session.createTextMessage("update");
- updateMessage.setStringProperty("CLIENT_ID", ":" + _client);
- updateMessage.setStringProperty("CONTROL_TYPE", "UPDATE");
- updateMessage.setLongProperty("RECEIVED", received);
- updateMessage.setIntProperty("BATCH", batchNumber);
- updateMessage.setLongProperty("DURATION", time);
-
- if (log.isInfoEnabled())
- {
- log.info("**** SENDING [" + batchNumber + "]**** " + "CLIENT_ID:" + _client + " RECEIVED:" + received
- + " BATCH:" + batchNumber + " DURATION:" + time);
- }
-
- // Output on the main console.info the details of this batch
- if ((batchNumber % 10) == 0)
- {
- console.info("Sending Report [" + batchNumber + "] " + "CLIENT_ID:" + _client + " RECEIVED:" + received
- + " BATCH:" + batchNumber + " DURATION:" + time);
- }
-
- _updater.send(updateMessage);
- }
- }
-
- /**
- * This class is used here to adjust the _delay value which in turn is used to control the number of messages/second
- * that are sent through the test system.
- *
- * By keeping a record of the messages recevied and the average time taken to process the batch size can be
- * calculated and so the delay can be adjusted to maintain that rate.
- *
- * Given that delays of < 10ms can be rounded up the delay is only used between messages if the _delay > 10ms * no
- * messages in the batch. Otherwise the delay is used at the end of the batch.
- */
- class SustainedRateAdapter implements MessageListener, Runnable
- {
- private SustainedClientTestCase _client;
- private long _batchVariance = Integer.getInteger("batchVariance", 3); // no. batches to allow drifting
- private long _timeVariance = TEN_MILLI_SEC * 5; // no. nanos between send and report delay (10ms)
- private volatile long _delay; // in nanos
- private long _sent;
- private Map<String, Long> _slowClients = new HashMap<String, Long>();
- private static final long PAUSE_SLEEP = TEN_MILLI_SEC / 1000; // 10 ms
- private static final long NO_CLIENT_SLEEP = 1000; // 1s
- private volatile boolean NO_CLIENTS = true;
- private int _delayShifting;
- private final int REPORTS_WITHOUT_CHANGE = Integer.getInteger("stableReportCount", 5);
- private boolean _warmedup = false;
- private static final long EXPECTED_TIME_PER_BATCH = 100000L;
- private int _warmUpBatches = Integer.getInteger("warmUpBatches", 10);
-
- SustainedRateAdapter(SustainedClientTestCase client)
- {
- _client = client;
- }
-
- public void onMessage(Message message)
- {
- if (log.isDebugEnabled())
- {
- log.debug("SustainedRateAdapter onMessage(Message message = " + message + "): called");
- }
-
- try
- {
- String controlType = message.getStringProperty("CONTROL_TYPE");
-
- // Check if the message is a test invite.
- if ("UPDATE".equals(controlType))
- {
- NO_CLIENTS = false;
- long duration = message.getLongProperty("DURATION");
- long totalReceived = message.getLongProperty("RECEIVED");
- String client = message.getStringProperty("CLIENT_ID");
- int batchNumber = message.getIntProperty("BATCH");
-
- if (log.isInfoEnabled() && ((batchNumber % DEBUG_LOG_UPATE_INTERVAL) == 0))
- {
- log.info("Update Report: CLIENT_ID:" + client + " RECEIVED:" + totalReceived + " Recevied BATCH:"
- + batchNumber + " DURATION:" + duration);
- }
-
- recordSlow(client, totalReceived, batchNumber);
-
- adjustDelay(client, batchNumber, duration);
-
- // Warm up completes when:
- // we haven't warmed up
- // and the number of batches sent to each client is at least half of the required warmup batches
- if (!_warmedup && (batchNumber >= _warmUpBatches))
- {
- _warmedup = true;
- _warmup.countDown();
-
- }
- }
- }
- catch (JMSException e)
- {
- //
- }
- }
-
- CountDownLatch _warmup = new CountDownLatch(1);
-
- int _numBatches = 10000;
-
- // long[] _timings = new long[_numBatches];
- private boolean _running = true;
-
- public void run()
- {
- console.info("Warming up");
-
- doBatch(_warmUpBatches);
-
- try
- {
- // wait for warmup to complete.
- _warmup.await();
-
- // set delay to the average length of the batches
- _delay = _totalDuration / _warmUpBatches / delays.size();
-
- console.info("Warmup complete delay set : " + _delay + " based on _totalDuration: " + _totalDuration
- + " over no. batches: " + _warmUpBatches + " with client count: " + delays.size());
-
- _totalDuration = 0L;
- _totalReceived = 0L;
- _sent = 0L;
- }
- catch (InterruptedException e)
- {
- //
- }
-
- doBatch(_numBatches);
-
- }
-
- private void doBatch(int batchSize) // long[] timings,
- {
- TextMessage testMessage = null;
- try
- {
- testMessage = _client.session[0].createTextMessage("start");
-
- for (int batch = 0; batch <= batchSize; batch++)
- // while (_running)
- {
- long start = System.nanoTime();
-
- testMessage.setText("start");
- testMessage.setIntProperty("BATCH", batch);
-
- _client.producer.send(testMessage);
- _rateAdapter.sentMessage();
-
- testMessage.setText("test");
- // start at 2 so start and end count as part of batch
- for (int m = 2; m < _batchSize; m++)
- {
- _client.producer.send(testMessage);
- _rateAdapter.sentMessage();
- }
-
- testMessage.setText("end");
- _client.producer.send(testMessage);
- _rateAdapter.sentMessage();
-
- long end = System.nanoTime();
-
- long sendtime = end - start;
-
- if (log.isDebugEnabled())
- {
- log.info("Sent batch[" + batch + "](" + _batchSize + ") in " + sendtime); // timings[batch]);
- }
-
- if ((batch % LOG_UPATE_INTERVAL) == 0)
- {
- console.info("Sent Batch[" + batch + "](" + _batchSize + ")" + status());
- }
-
- _rateAdapter.sleepBatch();
-
- }
- }
- catch (JMSException e)
- {
- console.error("Runner ended");
- }
- }
-
- private String status()
- {
- return " TotalDuration: " + _totalDuration + " for " + delays.size() + " consumers" + " Delay is " + _delay
- + " resulting in "
- + ((_delay > (TEN_MILLI_SEC * _batchSize)) ? ((_delay / _batchSize) + "/msg") : (_delay + "/batch"));
- }
-
- private void sleepBatch()
- {
- if (checkForSlowClients())
- { // if there werwe slow clients we have already slept so don't sleep anymore again.
- return;
- }
-
- if (!SLEEP_PER_MESSAGE)
- {
- // per batch sleep.. if sleep is to small to spread over the batch.
- if (_delay <= (TEN_MILLI_SEC * _batchSize))
- {
- sleepLong(_delay);
- }
- else
- {
- log.info("Not sleeping _delay > ten*batch is:" + _delay);
- }
- }
- }
-
- public void stop()
- {
- _running = false;
- }
-
- Map<String, Long> delays = new HashMap<String, Long>();
- Long _totalReceived = 0L;
- Long _totalDuration = 0L;
- int _skipUpdate = 0;
-
- /**
- * Adjust the delay for sending messages based on this update from the client
- *
- * @param client The client that send this update
- * @param duration The time taken for the last batch of messagse
- * @param batchNumber The reported batchnumber from the client
- */
- private void adjustDelay(String client, int batchNumber, long duration)
- {
- // Retrieve the current total time taken for this client.
- Long currentTime = delays.get(client);
-
- // Add the new duration time to this client
- if (currentTime == null)
- {
- currentTime = duration;
- }
- else
- {
- currentTime += duration;
- }
-
- delays.put(client, currentTime);
-
- long batchesSent = _sent / _batchSize;
-
- // ensure we don't divide by zero
- if (batchesSent == 0)
- {
- batchesSent = 1L;
- }
-
- _totalReceived += _batchSize;
- _totalDuration += duration;
-
- // calculate average duration accross clients per batch
- long averageDuration = _totalDuration / delays.size() / batchesSent;
-
- // calculate the difference between current send delay and average report delay
- long diff = (duration) - averageDuration;
-
- if (log.isInfoEnabled() && ((batchNumber % DEBUG_LOG_UPATE_INTERVAL) == 0))
- {
- log.info("TotalDuration:" + _totalDuration + " for " + delays.size() + " consumers." + " on batch: "
- + batchesSent + " received batch: " + batchNumber + " Batch Duration: " + duration + " Average: "
- + averageDuration + " so diff: " + diff + " for : " + client + " Delay is " + _delay + " resulting in "
- + ((_delay > (TEN_MILLI_SEC * _batchSize)) ? ((_delay / _batchSize) + "/msg") : (_delay + "/batch")));
- }
-
- // if the averageDuration differs from the current by more than the specified variane then adjust delay.
- if (Math.abs(diff) > _timeVariance)
- {
-
- // if the the _delay is larger than the required duration to send report
- // speed up
- if (diff > TEN_MILLI_SEC)
- {
- _delay -= TEN_MILLI_SEC;
-
- if (_delay < 0)
- {
- _delay = 0;
- log.info("Reset _delay to 0");
- delayStable();
- }
- else
- {
- delayChanged();
- }
-
- }
- else if (diff < 0) // diff < 0 diff cannot be 0 as it is > _timeVariance
- {
- // the report took longer
- _delay += TEN_MILLI_SEC;
- delayChanged();
- }
- }
- else
- {
- delayStable();
- }
-
- // If we have a consumer that is behind with the batches.
- if ((batchesSent - batchNumber) > _batchVariance)
- {
- log.debug("Increasing _delay as sending more than receiving");
-
- _delay += 2 * TEN_MILLI_SEC;
- delayChanged();
- }
-
- }
-
- /** Reset the number of iterations before we say the delay has stabilised. */
- private void delayChanged()
- {
- _delayShifting = REPORTS_WITHOUT_CHANGE;
- }
-
- /**
- * Record the fact that delay has stabilised If delay has stablised for REPORTS_WITHOUT_CHANGE then it will
- * output Delay stabilised
- */
- private void delayStable()
- {
- _delayShifting--;
-
- if (_delayShifting < 0)
- {
- _delayShifting = 0;
- console.debug("Delay stabilised:" + _delay);
- }
- }
-
- /**
- * Checks that the client has received enough messages. If the client has fallen behind then they are put in the
- * _slowClients lists which will increase the delay.
- *
- * @param client The client identifier to check
- * @param received the number of messages received by that client
- * @param batchNumber
- */
- private void recordSlow(String client, long received, int batchNumber)
- {
- if (Math.abs(batchNumber - (_sent / _batchSize)) > _batchVariance)
- {
- _slowClients.put(client, received);
- }
- else
- {
- _slowClients.remove(client);
- }
- }
-
- /** Incrment the number of sent messages and then sleep, if required. */
- public void sentMessage()
- {
-
- _sent++;
-
- if (_delay > (TEN_MILLI_SEC * _batchSize))
- {
- long batchDelay = _delay / _batchSize;
- // less than 10ms sleep doesn't always work.
- // _delay is in nano seconds
- // if (batchDelay < (TEN_MILLI_SEC))
- // {
- // sleep(0, (int) batchDelay);
- // }
- // else
- {
- // if (batchDelay < 30000000000L)
- {
- sleepLong(batchDelay);
- }
- }
- }
- else
- {
- if (SLEEP_PER_MESSAGE && (_delay > 0))
- {
- sleepLong(_delay / _batchSize);
- }
- }
- }
-
- /**
- * Check at the end of each batch and pause sending messages to allow slow clients to catch up.
- *
- * @return true if there were slow clients that caught up.
- */
- private boolean checkForSlowClients()
- {
- // This will allways be true as we are running this at the end of each batchSize
- // if (_sent % _batchSize == 0)
- {
- // Cause test to pause when we have slow
- if (!_slowClients.isEmpty() || NO_CLIENTS)
- {
-
- while (!_slowClients.isEmpty())
- {
- if (log.isInfoEnabled() && ((_sent / _batchSize % DEBUG_LOG_UPATE_INTERVAL) == 0))
- {
- String clients = "";
- Iterator it = _slowClients.keySet().iterator();
- while (it.hasNext())
- {
- clients += it.next();
- if (it.hasNext())
- {
- clients += ", ";
- }
- }
-
- log.info("Pausing for slow clients:" + clients);
- }
-
- if (console.isDebugEnabled() && ((_sent / _batchSize % LOG_UPATE_INTERVAL) == 0))
- {
- console.debug(_slowClients.size() + " slow clients.");
- }
-
- sleep(PAUSE_SLEEP);
- }
-
- if (NO_CLIENTS)
- {
- sleep(NO_CLIENT_SLEEP);
- }
-
- log.debug("Continuing");
-
- return true;
- }
- else
- {
- if ((_sent / _batchSize % LOG_UPATE_INTERVAL) == 0)
- {
- console.info("Total Delay :" + _delay + " "
- + ((_delayShifting == 0) ? "Stablised" : ("Not Stablised(" + _delayShifting + ")")));
- }
- }
-
- }
-
- return false;
- }
-
- /**
- * Sleep normally takes micro-seconds this allows the use of a nano-second value.
- *
- * @param delay nanoseconds to sleep for.
- */
- private void sleepLong(long delay)
- {
- sleep(delay / 1000000, (int) (delay % 1000000));
- }
-
- /**
- * Sleep for the specified micro-seconds.
- * @param sleep microseconds to sleep for.
- */
- private void sleep(long sleep)
- {
- sleep(sleep, 0);
- }
-
- /**
- * Perform the sleep , swallowing any InteruptException.
- *
- * NOTE: If a sleep request is > 10s then reset only sleep for 5s
- *
- * @param milli to sleep for
- * @param nano sub miliseconds to sleep for
- */
- private void sleep(long milli, int nano)
- {
- try
- {
- log.debug("Sleep:" + milli + ":" + nano);
- if (milli > 10000)
- {
-
- if (_delay == milli)
- {
- _totalDuration = _totalReceived / _batchSize * EXPECTED_TIME_PER_BATCH;
- log.error("Sleeping for more than 10 seconds adjusted to 5s!:" + (milli / 1000)
- + "s. Reset _totalDuration:" + _totalDuration);
- }
- else
- {
- log.error("Sleeping for more than 10 seconds adjusted to 5s!:" + (milli / 1000) + "s");
- }
-
- milli = 5000;
- }
-
- Thread.sleep(milli, nano);
- }
- catch (InterruptedException e)
- {
- //
- }
- }
-
- public void setClient(SustainedClientTestCase client)
- {
- _client = client;
- }
- }
-
-}
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCase.java b/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCase.java
deleted file mode 100644
index 36f9b4eaf1..0000000000
--- a/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCase.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.sustained;
-
-import org.apache.log4j.Logger;
-
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.test.framework.distributedtesting.DistributedTestCase;
-import org.apache.qpid.test.framework.DropInTest;
-
-import javax.jms.JMSException;
-import javax.jms.Message;
-
-import java.util.Properties;
-
-/**
- * SustainedTestCase is a {@link org.apache.qpid.test.framework.distributedtesting.DistributedTestCase} that runs the "Perf_SustainedPubSub" test case. This consists of one
- * test client sending, and several receiving, and attempts to find the highest rate at which messages can be broadcast
- * to the receivers. It is also a {@link DropInTest} to which more test clients may be added during a test run.
- *
- * <p/><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * <tr><td>
- * </table>
- */
-public class SustainedTestCase extends DistributedTestCase implements DropInTest
-{
- /** Used for debugging. */
- Logger log = Logger.getLogger(SustainedTestCase.class);
-
- /** Holds the root name of the topic on which to send the test messages. */
- private static final String SUSTAINED_KEY = "Perf_SustainedPubSub";
-
- /**
- * Creates a new coordinating test case with the specified name.
- *
- * @param name The test case name.
- */
- public SustainedTestCase(String name)
- {
- super(name);
- }
-
- /**
- * Performs a single test run of the sustained test.
- *
- * @throws Exception Any exceptions are allowed to fall through and fail the test.
- */
- public void testBasicPubSub() throws Exception
- {
- log.debug("public void testSinglePubSubCycle(): called");
-
- Properties testConfig = new Properties();
- testConfig.put("TEST_NAME", "Perf_SustainedPubSub");
- testConfig.put("SUSTAINED_KEY", SUSTAINED_KEY);
- testConfig.put("SUSTAINED_NUM_RECEIVERS", Integer.getInteger("numReceives", 2));
- testConfig.put("SUSTAINED_UPDATE_INTERVAL", Integer.getInteger("batchSize", 1000));
- testConfig.put("SUSTAINED_UPDATE_KEY", SUSTAINED_KEY + ".UPDATE");
- testConfig.put("ACKNOWLEDGE_MODE", Integer.getInteger("ackMode", AMQSession.AUTO_ACKNOWLEDGE));
-
- log.info("Created Config: " + testConfig.entrySet().toArray());
-
- getTestSequencer().sequenceTest(null, null, testConfig);
- }
-
- /**
- * Accepts a late joining client into this test case. The client will be enlisted with a control message
- * with the 'CONTROL_TYPE' field set to the value 'LATEJOIN'. It should also provide values for the fields:
- *
- * <p/><table>
- * <tr><td> CLIENT_NAME <td> A unique name for the new client.
- * <tr><td> CLIENT_PRIVATE_CONTROL_KEY <td> The key for the route on which the client receives its control messages.
- * </table>
- *
- * @param message The late joiners join message.
- *
- * @throws JMSException Any JMS Exception are allowed to fall through, indicating that the join failed.
- */
- public void lateJoin(Message message) throws JMSException
- {
- throw new RuntimeException("Not implemented.");
- /*
- // Extract the joining clients details from its join request message.
- TestClientDetails clientDetails = new TestClientDetails();
- clientDetails.clientName = message.getStringProperty("CLIENT_NAME");
- clientDetails.privateControlKey = message.getStringProperty("CLIENT_PRIVATE_CONTROL_KEY");
-
- // Register the joining client, but do block for confirmation as cannot do a synchronous receivers during this
- // method call, as it may have been called from an 'onMessage' method.
- assignReceiverRole(clientDetails, new Properties(), false);
- */
- }
-
- /**
- * Should provide a translation from the junit method name of a test to its test case name as known to the test
- * clients that will run the test. The purpose of this is to convert the JUnit method name into the correct test
- * case name to place into the test invite. For example the method "testP2P" might map onto the interop test case
- * name "TC2_BasicP2P".
- *
- * @param methodName The name of the JUnit test method.
- *
- * @return The name of the corresponding interop test case.
- */
- public String getTestCaseNameForTestMethod(String methodName)
- {
- return "Perf_SustainedPubSub";
- }
-}
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestClient.java b/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestClient.java
index 79707bafa5..6f2089290a 100644
--- a/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestClient.java
+++ b/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestClient.java
@@ -425,7 +425,7 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti
class SustainedRateAdapter implements MessageListener, Runnable
{
private SustainedTestClient _client;
- private long _batchVariance = Integer.getInteger("batchVariance", 3); //no. batches to allow drifting
+ private long _batchVariance = 3; //no. batches to allow drifting
private long _timeVariance = TEN_MILLI_SEC * 5; // no. nanos between send and report delay (10ms)
private volatile long _delay; //in nanos
private long _sent;
@@ -434,11 +434,9 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti
private static final long NO_CLIENT_SLEEP = 1000; // 1s
private volatile boolean NO_CLIENTS = true;
private int _delayShifting;
- private final int REPORTS_WITHOUT_CHANGE = Integer.getInteger("stableReportCount", 5);
+ private static final int REPORTS_WITHOUT_CHANGE = 5;
private boolean _warmedup = false;
private static final long EXPECTED_TIME_PER_BATCH = 100000L;
- private int _warmUpBatches = Integer.getInteger("warmUpBatches", 10);
-
SustainedRateAdapter(SustainedTestClient client)
{
@@ -495,6 +493,8 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti
CountDownLatch _warmup = new CountDownLatch(1);
+ int _warmUpBatches = Integer.getInteger("warmUpBatches", 10);
+
int _numBatches = 10000;
// long[] _timings = new long[_numBatches];
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCoordinator.java b/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCoordinator.java
new file mode 100644
index 0000000000..0075e45a8c
--- /dev/null
+++ b/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCoordinator.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.sustained;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.interop.coordinator.ListeningCoordinatorTest;
+import org.apache.qpid.interop.coordinator.TestClientDetails;
+import org.apache.qpid.interop.coordinator.testcases.CoordinatingTestCase3BasicPubSub;
+import org.apache.qpid.util.ConversationFactory;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Session;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+public class SustainedTestCoordinator extends CoordinatingTestCase3BasicPubSub implements ListeningCoordinatorTest
+{
+ /** Used for debugging. */
+ private static final Logger log = Logger.getLogger(SustainedTestCoordinator.class);
+ private List<TestClientDetails> _receivers;
+ private static final String SUSTAINED_KEY = "Perf_SustainedPubSub";
+ Map<String, Object> _testProperties;
+
+ /**
+ * Creates a new coordinating test case with the specified name.
+ *
+ * @param name The test case name.
+ */
+ public SustainedTestCoordinator(String name)
+ {
+ super(name);
+ _receivers = new LinkedList();
+ }
+
+ /**
+ * Adds a receiver to this test.
+ *
+ * @param receiver The contact details of the sending client in the test.
+ */
+ public void setReceiver(TestClientDetails receiver)
+ {
+ _receivers.add(receiver);
+ }
+
+
+ /**
+ * Performs the a single test run
+ *
+ * @throws Exception if there was a problem running the test.
+ */
+ public void testBasicPubSub() throws Exception
+ {
+ log.debug("public void testSinglePubSubCycle(): called");
+
+ Map<String, Object> testConfig = new HashMap<String, Object>();
+ testConfig.put("TEST_NAME", "Perf_SustainedPubSub");
+ testConfig.put("SUSTAINED_KEY", SUSTAINED_KEY);
+ testConfig.put("SUSTAINED_NUM_RECEIVERS", Integer.getInteger("numReceives", 2));
+ testConfig.put("SUSTAINED_UPDATE_INTERVAL", Integer.getInteger("batchSize", 1000));
+ testConfig.put("SUSTAINED_UPDATE_KEY", SUSTAINED_KEY + ".UPDATE");
+ testConfig.put("ACKNOWLEDGE_MODE", Integer.getInteger("ackMode", AMQSession.AUTO_ACKNOWLEDGE));
+
+ log.info("Created Config: " + testConfig.entrySet().toArray());
+
+ sequenceTest(testConfig);
+ }
+
+ /**
+ * Holds a test coordinating conversation with the test clients. This is the basic implementation of the inner loop
+ * of Use Case 5. It consists of assigning the test roles, begining the test and gathering the test reports from the
+ * participants.
+ *
+ * @param testProperties The test case definition.
+ *
+ * @return The test results from the senders and receivers.
+ *
+ * @throws javax.jms.JMSException All underlying JMSExceptions are allowed to fall through.
+ */
+ protected Message[] sequenceTest(Map<String, Object> testProperties) throws JMSException
+ {
+ log.debug("protected Message[] sequenceTest(Object... testProperties = " + testProperties + "): called");
+
+ Session session = conversationFactory.getSession();
+ Destination senderControlTopic = session.createTopic(sender.privateControlKey);
+
+ ConversationFactory.Conversation senderConversation = conversationFactory.startConversation();
+
+ // Assign the sender role to the sending test client.
+ Message assignSender = conversationFactory.getSession().createMessage();
+ setPropertiesOnMessage(assignSender, testProperties);
+ assignSender.setStringProperty("CONTROL_TYPE", "ASSIGN_ROLE");
+ assignSender.setStringProperty("ROLE", "SENDER");
+ assignSender.setStringProperty("CLIENT_NAME", "Sustained_SENDER");
+
+ senderConversation.send(senderControlTopic, assignSender);
+
+ //Assign and wait for the receiver ckuebts to be ready.
+ _testProperties = testProperties;
+
+ // Wait for the senders to confirm their roles.
+ senderConversation.receive();
+
+ assignReceivers();
+
+ // Start the test.
+ Message start = session.createMessage();
+ start.setStringProperty("CONTROL_TYPE", "START");
+
+ senderConversation.send(senderControlTopic, start);
+
+ // Wait for the test sender to return its report.
+ Message senderReport = senderConversation.receive();
+
+ try
+ {
+ Thread.sleep(500);
+ }
+ catch (InterruptedException e)
+ {
+ }
+
+ // Ask the receiver for its report.
+ Message statusRequest = session.createMessage();
+ statusRequest.setStringProperty("CONTROL_TYPE", "STATUS_REQUEST");
+
+
+ return new Message[]{senderReport};
+ }
+
+ private void assignReceivers()
+ {
+ for (TestClientDetails receiver : _receivers)
+ {
+ registerReceiver(receiver);
+ }
+ }
+
+ private void registerReceiver(TestClientDetails receiver)
+ {
+ log.info("registerReceiver called for receiver:" + receiver);
+ try
+ {
+ Session session = conversationFactory.getSession();
+ Destination receiverControlTopic = session.createTopic(receiver.privateControlKey);
+ ConversationFactory.Conversation receiverConversation = conversationFactory.startConversation();
+ // Assign the receiver role the receiving client.
+ Message assignReceiver = session.createMessage();
+ setPropertiesOnMessage(assignReceiver, _testProperties);
+ assignReceiver.setStringProperty("CONTROL_TYPE", "ASSIGN_ROLE");
+ assignReceiver.setStringProperty("ROLE", "RECEIVER");
+ assignReceiver.setStringProperty("CLIENT_NAME", "Sustained_RECEIVER_" + receiver.clientName);
+
+ receiverConversation.send(receiverControlTopic, assignReceiver);
+
+ //Don't wait for receiver to be ready.... we can't this is being done in
+ // the dispatcher thread, and most likely the acceptance message we
+ // want is sitting in the Dispatcher._queue waiting its turn for being
+ // dispatched so if we block here we won't can't get the message.
+ // So assume consumer is ready for action.
+ //receiverConversation.receive();
+ }
+ catch (JMSException e)
+ {
+ log.warn("Unable to assign receiver:" + receiver + ". Due to:" + e.getMessage());
+ }
+ }
+
+ public void latejoin(Message message)
+ {
+ try
+ {
+
+ TestClientDetails clientDetails = new TestClientDetails();
+ clientDetails.clientName = message.getStringProperty("CLIENT_NAME");
+ clientDetails.privateControlKey = message.getStringProperty("CLIENT_PRIVATE_CONTROL_KEY");
+
+
+ registerReceiver(clientDetails);
+ }
+ catch (JMSException e)
+ {
+ //swallow
+ }
+ }
+
+ /**
+ * Should provide a translation from the junit method name of a test to its test case name as defined in the interop
+ * testing specification. For example the method "testP2P" might map onto the interop test case name
+ * "TC2_BasicP2P".
+ *
+ * @param methodName The name of the JUnit test method.
+ *
+ * @return The name of the corresponding interop test case.
+ */
+ public String getTestCaseNameForTestMethod(String methodName)
+ {
+ return "Perf_SustainedPubSub";
+ }
+}
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/sustained/TestClient.java b/java/integrationtests/src/main/java/org/apache/qpid/sustained/TestClient.java
new file mode 100644
index 0000000000..44fc090410
--- /dev/null
+++ b/java/integrationtests/src/main/java/org/apache/qpid/sustained/TestClient.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.sustained;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.interop.testclient.InteropClientTestCase;
+import org.apache.qpid.util.CommandLineParser;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Properties;
+
+public class TestClient extends org.apache.qpid.interop.testclient.TestClient
+{
+ private static Logger log = Logger.getLogger(TestClient.class);
+
+ /**
+ * Creates a new interop test client, listenting to the specified broker and virtual host, with the specified client
+ * identifying name.
+ *
+ * @param brokerUrl The url of the broker to connect to.
+ * @param virtualHost The virtual host to conect to.
+ * @param clientName The client name to use.
+ */
+ public TestClient(String brokerUrl, String virtualHost, String clientName)
+ {
+ super(brokerUrl, virtualHost, clientName);
+ }
+
+ /**
+ * The entry point for the interop test coordinator. This client accepts the following command line arguments:
+ *
+ * <p/><table> <tr><td> -b <td> The broker URL. <td> Optional. <tr><td> -h <td> The virtual
+ * host. <td> Optional. <tr><td> -n <td> The test client name. <td> Optional. <tr><td> name=value <td>
+ * Trailing argument define name/value pairs. Added to system properties. <td> Optional. </table>
+ *
+ * @param args The command line arguments.
+ */
+ public static void main(String[] args)
+ {
+ // Use the command line parser to evaluate the command line.
+ CommandLineParser commandLine =
+ new CommandLineParser(
+ new String[][]
+ {
+ {"b", "The broker URL.", "broker", "false"},
+ {"h", "The virtual host to use.", "virtual host", "false"},
+ {"n", "The test client name.", "name", "false"},
+ {"j", "Join this test client to running test.", "join", ""}
+ });
+
+ // Capture the command line arguments or display errors and correct usage and then exit.
+ Properties options = null;
+
+ try
+ {
+ options = commandLine.parseCommandLine(args);
+ }
+ catch (IllegalArgumentException e)
+ {
+ System.out.println(commandLine.getErrors());
+ System.out.println(commandLine.getUsage());
+ System.exit(1);
+ }
+
+ // Extract the command line options.
+ String brokerUrl = options.getProperty("b");
+ String virtualHost = options.getProperty("h");
+ String clientName = options.getProperty("n");
+ String join = options.getProperty("j");
+
+ // Add all the trailing command line options (name=value pairs) to system properties. Tests may pick up
+ // overridden values from there.
+ commandLine.addCommandLineToSysProperties();
+
+ // Create a test client and start it running.
+ TestClient client = new TestClient(brokerUrl, virtualHost, (clientName == null) ? CLIENT_NAME : clientName);
+
+ // Use a class path scanner to find all the interop test case implementations.
+ Collection<Class<? extends InteropClientTestCase>> testCaseClasses =
+ new ArrayList<Class<? extends InteropClientTestCase>>();
+ // ClasspathScanner.getMatches(InteropClientTestCase.class, "^TestCase.*", true);
+ // Hard code the test classes till the classpath scanner is fixed.
+ Collections.addAll(testCaseClasses,
+ SustainedTestClient.class);
+
+
+ try
+ {
+ client.start(testCaseClasses, join);
+ }
+ catch (Exception e)
+ {
+ log.error("The test client was unable to start.", e);
+ System.exit(1);
+ }
+ }
+
+ protected void start(Collection<Class<? extends InteropClientTestCase>> testCaseClasses, String join) throws JMSException, ClassNotFoundException
+ {
+ super.start(testCaseClasses);
+ log.debug("private void start(): called");
+
+ if (join != null && !join.equals(""))
+ {
+ Message latejoin = session.createMessage();
+
+ try
+ {
+ Object test = Class.forName(join).newInstance();
+ if (test instanceof InteropClientTestCase)
+ {
+ currentTestCase = (InteropClientTestCase) test;
+ }
+ else
+ {
+ throw new RuntimeException("Requested to join class '" + join + "' but this is not a InteropClientTestCase.");
+ }
+
+ latejoin.setStringProperty("CONTROL_TYPE", "LATEJOIN");
+ latejoin.setStringProperty("CLIENT_NAME", clientName);
+ latejoin.setStringProperty("CLIENT_PRIVATE_CONTROL_KEY", "iop.control." + clientName);
+ producer.send(session.createTopic("iop.control.test." + currentTestCase.getName()), latejoin);
+ }
+ catch (InstantiationException e)
+ {
+ log.warn("Unable to request latejoining of test:" + currentTestCase);
+ }
+ catch (IllegalAccessException e)
+ {
+ log.warn("Unable to request latejoining of test:" + currentTestCase);
+ }
+ }
+ }
+
+}
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/sustained/TestCoordinator.java b/java/integrationtests/src/main/java/org/apache/qpid/sustained/TestCoordinator.java
new file mode 100644
index 0000000000..7e12fe39fb
--- /dev/null
+++ b/java/integrationtests/src/main/java/org/apache/qpid/sustained/TestCoordinator.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.sustained;
+
+import org.apache.qpid.interop.coordinator.Coordinator;
+import org.apache.qpid.interop.coordinator.ListeningTestDecorator;
+import org.apache.qpid.interop.coordinator.TestClientDetails;
+import org.apache.qpid.util.CommandLineParser;
+import org.apache.qpid.util.ConversationFactory;
+import org.apache.log4j.Logger;
+
+import java.util.Properties;
+import java.util.Set;
+
+import junit.framework.TestResult;
+import uk.co.thebadgerset.junit.extensions.WrappedSuiteTestDecorator;
+
+import javax.jms.Connection;
+
+public class TestCoordinator extends Coordinator
+{
+
+ private static final Logger log = Logger.getLogger(TestCoordinator.class);
+
+ /**
+ * Creates an interop test coordinator on the specified broker and virtual host.
+ *
+ * @param brokerUrl The URL of the broker to connect to.
+ * @param virtualHost The virtual host to run all tests on. Optional, may be <tt>null</tt>.
+ */
+ TestCoordinator(String brokerUrl, String virtualHost)
+ {
+ super(brokerUrl, virtualHost);
+ }
+
+ protected WrappedSuiteTestDecorator newTestDecorator(WrappedSuiteTestDecorator targetTest, Set<TestClientDetails> enlistedClients, ConversationFactory conversationFactory, Connection connection)
+ {
+ return new ListeningTestDecorator(targetTest, enlistedClients, conversationFactory, connection);
+ }
+
+
+ /**
+ * The entry point for the interop test coordinator. This client accepts the following command line arguments:
+ *
+ * <p/><table> <tr><td> -b <td> The broker URL. <td> Mandatory. <tr><td> -h <td> The virtual host.
+ * <td> Optional. <tr><td> name=value <td> Trailing argument define name/value pairs. Added to system properties.
+ * <td> Optional. </table>
+ *
+ * @param args The command line arguments.
+ */
+ public static void main(String[] args)
+ {
+ try
+ {
+ // Use the command line parser to evaluate the command line with standard handling behaviour (print errors
+ // and usage then exist if there are errors).
+ Properties options =
+ CommandLineParser.processCommandLine(args,
+ new CommandLineParser(
+ new String[][]
+ {
+ {"b", "The broker URL.", "broker", "false"},
+ {"h", "The virtual host to use.", "virtual host", "false"},
+ {"o", "The name of the directory to output test timings to.", "dir", "false"}
+ }));
+
+ // Extract the command line options.
+ String brokerUrl = options.getProperty("b");
+ String virtualHost = options.getProperty("h");
+ String reportDir = options.getProperty("o");
+ reportDir = (reportDir == null) ? "." : reportDir;
+
+
+ String[] testClassNames = {SustainedTestCoordinator.class.getName()};
+
+ // Create a coordinator and begin its test procedure.
+ Coordinator coordinator = new TestCoordinator(brokerUrl, virtualHost);
+
+ coordinator.setReportDir(reportDir);
+
+ TestResult testResult = coordinator.start(testClassNames);
+
+ if (testResult.failureCount() > 0)
+ {
+ System.exit(FAILURE_EXIT);
+ }
+ else
+ {
+ System.exit(SUCCESS_EXIT);
+ }
+ }
+ catch (Exception e)
+ {
+ System.err.println(e.getMessage());
+ log.error("Top level handler caught execption.", e);
+ System.exit(EXCEPTION_EXIT);
+ }
+ }
+}
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/util/ClasspathScanner.java b/java/integrationtests/src/main/java/org/apache/qpid/util/ClasspathScanner.java
new file mode 100644
index 0000000000..bad49060ca
--- /dev/null
+++ b/java/integrationtests/src/main/java/org/apache/qpid/util/ClasspathScanner.java
@@ -0,0 +1,234 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.util;
+
+import java.io.File;
+import java.util.*;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.log4j.Logger;
+
+/**
+ * An ClasspathScanner scans the classpath for classes that implement an interface or extend a base class and have names
+ * that match a regular expression.
+ *
+ * <p/>In order to test whether a class implements an interface or extends a class, the class must be loaded (unless
+ * the class files were to be scanned directly). Using this collector can cause problems when it scans the classpath,
+ * because loading classes will initialize their statics, which in turn may cause undesired side effects. For this
+ * reason, the collector should always be used with a regular expression, through which the class file names are
+ * filtered, and only those that pass this filter will be tested. For example, if you define tests in classes that
+ * end with the keyword "Test" then use the regular expression "Test$" to match this.
+ *
+ * <p><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Find all classes matching type and name pattern on the classpath.
+ * </table>
+ *
+ * @todo Add logic to scan jars as well as directories.
+ */
+public class ClasspathScanner
+{
+ private static final Logger log = Logger.getLogger(ClasspathScanner.class);
+
+ /**
+ * Scans the classpath and returns all classes that extend a specified class and match a specified name.
+ * There is an flag that can be used to indicate that only Java Beans will be matched (that is, only those classes
+ * that have a default constructor).
+ *
+ * @param matchingClass The class or interface to match.
+ * @param matchingRegexp The regular expression to match against the class name.
+ * @param beanOnly Flag to indicate that onyl classes with default constructors should be matched.
+ *
+ * @return All the classes that match this collector.
+ */
+ public static <T> Collection<Class<? extends T>> getMatches(Class<T> matchingClass, String matchingRegexp,
+ boolean beanOnly)
+ {
+ log.debug("public static <T> Collection<Class<? extends T>> getMatches(Class<T> matchingClass = " + matchingClass
+ + ", String matchingRegexp = " + matchingRegexp + ", boolean beanOnly = " + beanOnly + "): called");
+
+ // Build a compiled regular expression from the pattern to match.
+ Pattern matchPattern = Pattern.compile(matchingRegexp);
+
+ String classPath = System.getProperty("java.class.path");
+ Map<String, Class<? extends T>> result = new HashMap<String, Class<? extends T>>();
+
+ log.debug("classPath = " + classPath);
+
+ // Find matching classes starting from all roots in the classpath.
+ for (String path : splitClassPath(classPath))
+ {
+ gatherFiles(new File(path), "", result, matchPattern, matchingClass);
+ }
+
+ return result.values();
+ }
+
+ /**
+ * Finds all matching classes rooted at a given location in the file system. If location is a directory it
+ * is recursively examined.
+ *
+ * @param classRoot The root of the current point in the file system being examined.
+ * @param classFileName The name of the current file or directory to examine.
+ * @param result The accumulated mapping from class names to classes that match the scan.
+ *
+ * @todo Recursion ok as file system depth is not likely to exhaust the stack. Might be better to replace with
+ * iteration.
+ */
+ private static <T> void gatherFiles(File classRoot, String classFileName, Map<String, Class<? extends T>> result,
+ Pattern matchPattern, Class<? extends T> matchClass)
+ {
+ log.debug("private static <T> void gatherFiles(File classRoot = " + classRoot + ", String classFileName = "
+ + classFileName + ", Map<String, Class<? extends T>> result, Pattern matchPattern = " + matchPattern
+ + ", Class<? extends T> matchClass = " + matchClass + "): called");
+
+ File thisRoot = new File(classRoot, classFileName);
+
+ // If the current location is a file, check if it is a matching class.
+ if (thisRoot.isFile())
+ {
+ // Check that the file has a matching name.
+ if (matchesName(thisRoot.getName(), matchPattern))
+ {
+ String className = classNameFromFile(thisRoot.getName());
+
+ // Check that the class has matching type.
+ try
+ {
+ Class<?> candidateClass = Class.forName(className);
+
+ Class matchedClass = matchesClass(candidateClass, matchClass);
+
+ if (matchedClass != null)
+ {
+ result.put(className, matchedClass);
+ }
+ }
+ catch (ClassNotFoundException e)
+ {
+ // Ignore this. The matching class could not be loaded.
+ log.debug("Got ClassNotFoundException, ignoring.", e);
+ }
+ }
+
+ return;
+ }
+ // Otherwise the current location is a directory, so examine all of its contents.
+ else
+ {
+ String[] contents = thisRoot.list();
+
+ if (contents != null)
+ {
+ for (String content : contents)
+ {
+ gatherFiles(classRoot, classFileName + File.separatorChar + content, result, matchPattern, matchClass);
+ }
+ }
+ }
+ }
+
+ /**
+ * Checks if the specified class file name corresponds to a class with name matching the specified regular expression.
+ *
+ * @param classFileName The class file name.
+ * @param matchPattern The regular expression pattern to match.
+ *
+ * @return <tt>true</tt> if the class name matches, <tt>false</tt> otherwise.
+ */
+ private static boolean matchesName(String classFileName, Pattern matchPattern)
+ {
+ String className = classNameFromFile(classFileName);
+ Matcher matcher = matchPattern.matcher(className);
+
+ return matcher.matches();
+ }
+
+ /**
+ * Checks if the specified class to compare extends the base class being scanned for.
+ *
+ * @param matchingClass The base class to match against.
+ * @param toMatch The class to match against the base class.
+ *
+ * @return The class to check, cast as an instance of the class to match if the class extends the base class, or
+ * <tt>null</tt> otherwise.
+ */
+ private static <T> Class<? extends T> matchesClass(Class<?> matchingClass, Class<? extends T> toMatch)
+ {
+ try
+ {
+ return matchingClass.asSubclass(toMatch);
+ }
+ catch (ClassCastException e)
+ {
+ return null;
+ }
+ }
+
+ /**
+ * Takes a classpath (which is a series of paths) and splits it into its component paths.
+ *
+ * @param classPath The classpath to split.
+ *
+ * @return A list of the component paths that make up the class path.
+ */
+ private static List<String> splitClassPath(String classPath)
+ {
+ List<String> result = new LinkedList<String>();
+ String separator = System.getProperty("path.separator");
+ StringTokenizer tokenizer = new StringTokenizer(classPath, separator);
+
+ while (tokenizer.hasMoreTokens())
+ {
+ result.add(tokenizer.nextToken());
+ }
+
+ return result;
+ }
+
+ /**
+ * Translates from the filename of a class to its fully qualified classname. Files are named using forward slash
+ * seperators and end in ".class", whereas fully qualified class names use "." sperators and no ".class" ending.
+ *
+ * @param classFileName The filename of the class to translate to a class name.
+ *
+ * @return The fully qualified class name.
+ */
+ private static String classNameFromFile(String classFileName)
+ {
+ log.debug("private static String classNameFromFile(String classFileName = " + classFileName + "): called");
+
+ // Remove the .class ending.
+ String s = classFileName.substring(0, classFileName.length() - ".class".length());
+
+ // Turn / seperators in . seperators.
+ String s2 = s.replace(File.separatorChar, '.');
+
+ // Knock off any leading . caused by a leading /.
+ if (s2.startsWith("."))
+ {
+ return s2.substring(1);
+ }
+
+ return s2;
+ }
+}
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/util/ConversationFactory.java b/java/integrationtests/src/main/java/org/apache/qpid/util/ConversationFactory.java
new file mode 100644
index 0000000000..0090bec3d0
--- /dev/null
+++ b/java/integrationtests/src/main/java/org/apache/qpid/util/ConversationFactory.java
@@ -0,0 +1,479 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.util;
+
+import org.apache.log4j.Logger;
+
+import javax.jms.*;
+
+import java.util.*;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * A conversation helper, uses a message correlation id pattern to match up sent and received messages as a conversation
+ * over JMS messaging. Incoming message traffic is divided up by correlation id. Each id has a queue (behaviour dependant
+ * on the queue implementation). Clients of this de-multiplexer can wait on messages, defined by message correlation ids.
+ *
+ * <p/>One use of this is as a conversation synchronizer where multiple threads are carrying out conversations over a
+ * multiplexed messaging route. This can be usefull, as JMS sessions are not multi-threaded. Setting up the conversation
+ * with synchronous queues will allow these threads to be written in a synchronous style, but with their execution order
+ * governed by the asynchronous message flow. For example, something like the following code could run a multi-threaded
+ * conversation (the conversation methods can be called many times in parallel):
+ *
+ * <p/><pre>
+ * class Initiator
+ * {
+ * ConversationHelper conversation = new ConversationHelper(connection, null,
+ * java.util.concurrent.LinkedBlockingQueue.class);
+ *
+ * initiateConversation()
+ * {
+ * try {
+ * // Exchange greetings.
+ * conversation.send(sendDestination, conversation.getSession().createTextMessage("Hello."));
+ * Message greeting = conversation.receive();
+ *
+ * // Exchange goodbyes.
+ * conversation.send(conversation.getSession().createTextMessage("Goodbye."));
+ * Message goodbye = conversation.receive();
+ * } finally {
+ * conversation.end();
+ * }
+ * }
+ * }
+ *
+ * class Responder
+ * {
+ * ConversationHelper conversation = new ConversationHelper(connection, receiveDestination,
+ * java.util.concurrent.LinkedBlockingQueue.class);
+ *
+ * respondToConversation()
+ * {
+ * try {
+ * // Exchange greetings.
+ * Message greeting = conversation.receive();
+ * conversation.send(conversation.getSession().createTextMessage("Hello."));
+ *
+ * // Exchange goodbyes.
+ * Message goodbye = conversation.receive();
+ * conversation.send(conversation.getSession().createTextMessage("Goodbye."));
+ * } finally {
+ * conversation.end();
+ * }
+ * }
+ * }
+ * </pre>
+ *
+ * <p/>Conversation correlation id's are generated on a per thread basis.
+ *
+ * <p/>The same session is shared amongst all conversations. Calls to send are therefore synchronized because JMS
+ * sessions are not multi-threaded.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><th> Associate messages to an ongoing conversation using correlation ids.
+ * <tr><td> Auto manage sessions for conversations.
+ * <tr><td> Store messages not in a conversation in dead letter box.
+ * </table>
+ */
+public class ConversationFactory
+{
+ /** Used for debugging. */
+ private static final Logger log = Logger.getLogger(ConversationFactory.class);
+
+ /** Holds a map from correlation id's to queues. */
+ private Map<Long, BlockingQueue<Message>> idsToQueues = new HashMap<Long, BlockingQueue<Message>>();
+
+ /** Holds the connection over which the conversation is conducted. */
+ private Connection connection;
+
+ /** Holds the session over which the conversation is conduxted. */
+ private Session session;
+
+ /** The message consumer for incoming messages. */
+ MessageConsumer consumer;
+
+ /** The message producer for outgoing messages. */
+ MessageProducer producer;
+
+ /** The well-known or temporary destination to receive replies on. */
+ Destination receiveDestination;
+
+ /** Holds the queue implementation class for the reply queue. */
+ Class<? extends BlockingQueue> queueClass;
+
+ /** Used to hold any replies that are received outside of the context of a conversation. */
+ BlockingQueue<Message> deadLetterBox = new LinkedBlockingQueue<Message>();
+
+ /* Used to hold conversation state on a per thread basis. */
+ /*
+ ThreadLocal<Conversation> threadLocals =
+ new ThreadLocal<Conversation>()
+ {
+ protected Conversation initialValue()
+ {
+ Conversation settings = new Conversation();
+ settings.conversationId = conversationIdGenerator.getAndIncrement();
+
+ return settings;
+ }
+ };
+ */
+
+ /** Generates new coversation id's as needed. */
+ AtomicLong conversationIdGenerator = new AtomicLong();
+
+ /**
+ * Creates a conversation helper on the specified connection with the default sending destination, and listening
+ * to the specified receiving destination.
+ *
+ * @param connection The connection to build the conversation helper on.
+ * @param receiveDestination The destination to listen to for incoming messages. This may be null to use a temporary
+ * queue.
+ * @param queueClass The queue implementation class.
+ *
+ * @throws JMSException All underlying JMSExceptions are allowed to fall through.
+ */
+ public ConversationFactory(Connection connection, Destination receiveDestination,
+ Class<? extends BlockingQueue> queueClass) throws JMSException
+ {
+ log.debug("public ConversationFactory(Connection connection, Destination receiveDestination = " + receiveDestination
+ + ", Class<? extends BlockingQueue> queueClass = " + queueClass + "): called");
+
+ this.connection = connection;
+ this.queueClass = queueClass;
+
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ // Check if a well-known receive destination has been provided, or use a temporary queue if not.
+ this.receiveDestination = (receiveDestination != null) ? receiveDestination : session.createTemporaryQueue();
+
+ consumer = session.createConsumer(receiveDestination);
+ producer = session.createProducer(null);
+
+ consumer.setMessageListener(new Receiver());
+ }
+
+ /**
+ * Creates a new conversation context.
+ *
+ * @return A new conversation context.
+ */
+ public Conversation startConversation()
+ {
+ log.debug("public Conversation startConversation(): called");
+
+ Conversation conversation = new Conversation();
+ conversation.conversationId = conversationIdGenerator.getAndIncrement();
+
+ return conversation;
+ }
+
+ /**
+ * Ensures that the reply queue for a conversation exists.
+ *
+ * @param conversationId The conversation correlation id.
+ */
+ private void initQueueForId(long conversationId)
+ {
+ if (!idsToQueues.containsKey(conversationId))
+ {
+ idsToQueues.put(conversationId, ReflectionUtils.<BlockingQueue>newInstance(queueClass));
+ }
+ }
+
+ /**
+ * Clears the dead letter box, returning all messages that were in it.
+ *
+ * @return All messages in the dead letter box.
+ */
+ public Collection<Message> emptyDeadLetterBox()
+ {
+ log.debug("public Collection<Message> emptyDeadLetterBox(): called");
+
+ Collection<Message> result = new ArrayList<Message>();
+ deadLetterBox.drainTo(result);
+
+ return result;
+ }
+
+ /**
+ * Gets the session over which the conversation is conducted.
+ *
+ * @return The session over which the conversation is conducted.
+ */
+ public Session getSession()
+ {
+ // Conversation settings = threadLocals.get();
+
+ return session;
+ }
+
+ /**
+ * Used to hold a conversation context. This consists of a correlating id for the conversation, and a reply
+ * destination automatically updated to the last received reply-to destination.
+ */
+ public class Conversation
+ {
+ /** Holds the correlation id for the context. */
+ long conversationId;
+
+ /**
+ * Holds the send destination for the context. This will automatically be updated to the most recently received
+ * reply-to destination.
+ */
+ Destination sendDestination;
+
+ /**
+ * Sends a message to the default sending location. The correlation id of the message will be assigned by this
+ * method, overriding any previously set value.
+ *
+ * @param sendDestination The destination to send to. This may be null to use the last received reply-to
+ * destination.
+ * @param message The message to send.
+ *
+ * @throws JMSException All undelying JMSExceptions are allowed to fall through. This will also be thrown if no
+ * send destination is specified and there is no most recent reply-to destination available
+ * to use.
+ */
+ public void send(Destination sendDestination, Message message) throws JMSException
+ {
+ log.debug("public void send(Destination sendDestination = " + sendDestination + ", Message message = " + message
+ + "): called");
+
+ // Conversation settings = threadLocals.get();
+ // long conversationId = conversationId;
+ message.setJMSCorrelationID(Long.toString(conversationId));
+ message.setJMSReplyTo(receiveDestination);
+
+ // Ensure that the reply queue for this conversation exists.
+ initQueueForId(conversationId);
+
+ // Check if an overriding send to destination has been set or use the last reply-to if not.
+ Destination sendTo = null;
+
+ if (sendDestination != null)
+ {
+ sendTo = sendDestination;
+ }
+ else if (sendDestination != null)
+ {
+ sendTo = sendDestination;
+ }
+ else
+ {
+ throw new JMSException("The send destination was specified, and no most recent reply-to available to use.");
+ }
+
+ // Send the message.
+ synchronized (this)
+ {
+ producer.send(sendTo, message);
+ }
+ }
+
+ /**
+ * Gets the next message in an ongoing conversation. This method may block until such a message is received.
+ *
+ * @return The next incoming message in the conversation.
+ *
+ * @throws JMSException All undelying JMSExceptions are allowed to fall through. Thrown if the received message
+ * did not have its reply-to destination set up.
+ */
+ public Message receive() throws JMSException
+ {
+ log.debug("public Message receive(): called");
+
+ // Conversation settings = threadLocals.get();
+ // long conversationId = settings.conversationId;
+
+ // Ensure that the reply queue for this conversation exists.
+ initQueueForId(conversationId);
+
+ BlockingQueue<Message> queue = idsToQueues.get(conversationId);
+
+ try
+ {
+ Message result = queue.take();
+
+ // Keep the reply-to destination to send replies to.
+ sendDestination = result.getJMSReplyTo();
+
+ return result;
+ }
+ catch (InterruptedException e)
+ {
+ return null;
+ }
+ }
+
+ /**
+ * Gets many messages in an ongoing conversation. If a limit is specified, then once that many messages are
+ * received they will be returned. If a timeout is specified, then all messages up to the limit, received within
+ * that timespan will be returned. At least one of the message count or timeout should be set to a value of
+ * 1 or greater.
+ *
+ * @param num The number of messages to receive, or all if this is less than 1.
+ * @param timeout The timeout in milliseconds to receive the messages in, or forever if this is less than 1.
+ *
+ * @return All messages received within the count limit and the timeout.
+ *
+ * @throws JMSException All undelying JMSExceptions are allowed to fall through.
+ */
+ public Collection<Message> receiveAll(int num, long timeout) throws JMSException
+ {
+ log.debug("public Collection<Message> receiveAll(int num = " + num + ", long timeout = " + timeout
+ + "): called");
+
+ // Check that a timeout or message count was set.
+ if ((num < 1) && (timeout < 1))
+ {
+ throw new IllegalArgumentException("At least one of message count (num) or timeout must be set.");
+ }
+
+ // Ensure that the reply queue for this conversation exists.
+ initQueueForId(conversationId);
+ BlockingQueue<Message> queue = idsToQueues.get(conversationId);
+
+ // Used to collect the received messages in.
+ Collection<Message> result = new ArrayList<Message>();
+
+ // Used to indicate when the timeout or message count has expired.
+ boolean receiveMore = true;
+
+ int messageCount = 0;
+
+ // Receive messages until the timeout or message count expires.
+ do
+ {
+ try
+ {
+ Message next = null;
+
+ // Try to receive the message with a timeout if one has been set.
+ if (timeout > 0)
+ {
+ next = queue.poll(timeout, TimeUnit.MILLISECONDS);
+
+ // Check if the timeout expired, and stop receiving if so.
+ if (next == null)
+ {
+ receiveMore = false;
+ }
+ }
+ // Receive the message without a timeout.
+ else
+ {
+ next = queue.take();
+ }
+
+ // Increment the message count if a message was received.
+ messageCount += (next != null) ? 1 : 0;
+
+ // Check if all the requested messages were received, and stop receiving if so.
+ if ((num > 0) && (messageCount >= num))
+ {
+ receiveMore = false;
+ }
+
+ // Keep the reply-to destination to send replies to.
+ sendDestination = (next != null) ? next.getJMSReplyTo() : sendDestination;
+
+ if (next != null)
+ {
+ result.add(next);
+ }
+ }
+ catch (InterruptedException e)
+ {
+ // Restore the threads interrupted status.
+ Thread.currentThread().interrupt();
+
+ // Stop receiving but return the messages received so far.
+ receiveMore = false;
+ }
+ }
+ while (receiveMore);
+
+ return result;
+ }
+
+ /**
+ * Completes the conversation. Any correlation id's pertaining to the conversation are no longer valid, and any
+ * incoming messages using them will go to the dead letter box.
+ */
+ public void end()
+ {
+ log.debug("public void end(): called");
+
+ // Ensure that the thread local for the current thread is cleaned up.
+ // Conversation settings = threadLocals.get();
+ // long conversationId = settings.conversationId;
+ // threadLocals.remove();
+
+ // Ensure that its queue is removed from the queue map.
+ BlockingQueue<Message> queue = idsToQueues.remove(conversationId);
+
+ // Move any outstanding messages on the threads conversation id into the dead letter box.
+ queue.drainTo(deadLetterBox);
+ }
+ }
+
+ /**
+ * Implements the message listener for this conversation handler.
+ */
+ protected class Receiver implements MessageListener
+ {
+ /**
+ * Handles all incoming messages in the ongoing conversations. These messages are split up by correaltion id
+ * and placed into queues.
+ *
+ * @param message The incoming message.
+ */
+ public void onMessage(Message message)
+ {
+ log.debug("public void onMessage(Message message = " + message + "): called");
+
+ try
+ {
+ Long conversationId = Long.parseLong(message.getJMSCorrelationID());
+
+ // Find the converstaion queue to place the message on. If there is no conversation for the message id,
+ // the the dead letter box queue is used.
+ BlockingQueue<Message> queue = idsToQueues.get(conversationId);
+ queue = (queue == null) ? deadLetterBox : queue;
+
+ queue.put(message);
+ }
+ catch (JMSException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+}
diff --git a/java/integrationtests/src/resources/org/apache/qpid/interop/connection.properties b/java/integrationtests/src/resources/org/apache/qpid/interop/connection.properties
new file mode 100644
index 0000000000..a5fb611dfa
--- /dev/null
+++ b/java/integrationtests/src/resources/org/apache/qpid/interop/connection.properties
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+java.naming.factory.initial = org.apache.qpid.jndi.PropertiesFileInitialContextFactory
+connectionfactory.broker = amqp://guest:guest@clientid/?brokerlist='tcp://localhost:5672'