summaryrefslogtreecommitdiff
path: root/java/integrationtests
diff options
context:
space:
mode:
authorRobert Greig <rgreig@apache.org>2007-05-08 12:04:37 +0000
committerRobert Greig <rgreig@apache.org>2007-05-08 12:04:37 +0000
commit3396c24635dd2dc387bcb23196f241a1d6b8a35f (patch)
tree25f2b6d3927277e89ff88df58d6bcbe6458b457c /java/integrationtests
parent582a91714ddfb865614b6a335a3da2841299e07e (diff)
downloadqpid-python-3396c24635dd2dc387bcb23196f241a1d6b8a35f.tar.gz
Merged revisions 534903-535308,535310-535808,535810-536007,536009-536140,536142-536164 via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2 ........ r534903 | rgreig | 2007-05-03 16:09:18 +0100 (Thu, 03 May 2007) | 1 line More interop test stuff. ........ r535254 | rgreig | 2007-05-04 15:27:53 +0100 (Fri, 04 May 2007) | 1 line First two test cases completed. Still to do pub/sub. ........ r535874 | rgreig | 2007-05-07 14:13:06 +0100 (Mon, 07 May 2007) | 1 line Added remaining test case. ........ r536163 | rgreig | 2007-05-08 12:21:35 +0100 (Tue, 08 May 2007) | 1 line Added XML logging of test results. ........ r536164 | rgreig | 2007-05-08 12:39:51 +0100 (Tue, 08 May 2007) | 1 line Added inclusion of sender and receiver names in results. ........ git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@536167 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/integrationtests')
-rw-r--r--java/integrationtests/jar-with-dependencies.xml29
-rw-r--r--java/integrationtests/pom.xml13
-rw-r--r--java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/CoordinatingTestCase.java125
-rw-r--r--java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/Coordinator.java136
-rw-r--r--java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/InvitingTestDecorator.java43
-rw-r--r--java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/OptOutTestCase.java13
-rw-r--r--java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/TestClientDetails.java2
-rw-r--r--java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/XMLTestListener.java381
-rw-r--r--java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/testcases/CoordinatingTestCase1DummyRun.java64
-rw-r--r--java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/testcases/CoordinatingTestCase2BasicP2P.java31
-rw-r--r--java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/testcases/CoordinatingTestCase3BasicPubSub.java71
-rw-r--r--java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/InteropClientTestCase.java4
-rw-r--r--java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java105
-rw-r--r--java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase1DummyRun.java2
-rw-r--r--java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase2BasicP2P.java94
-rw-r--r--java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase3BasicPubSub.java224
-rw-r--r--java/integrationtests/src/main/java/org/apache/qpid/util/ConversationFactory.java479
-rw-r--r--java/integrationtests/src/main/java/org/apache/qpid/util/ConversationHelper.java306
18 files changed, 1722 insertions, 400 deletions
diff --git a/java/integrationtests/jar-with-dependencies.xml b/java/integrationtests/jar-with-dependencies.xml
new file mode 100644
index 0000000000..62978ee864
--- /dev/null
+++ b/java/integrationtests/jar-with-dependencies.xml
@@ -0,0 +1,29 @@
+<!-- This is an assembly descriptor that produces a jar file that contains all the
+ dependencies, fully expanded into a single jar, required to run the tests of
+ a maven project.
+ -->
+<assembly>
+ <id>all-test-deps</id>
+ <formats>
+ <format>jar</format>
+ </formats>
+ <includeBaseDirectory>false</includeBaseDirectory>
+ <dependencySets>
+ <dependencySet>
+ <outputDirectory></outputDirectory>
+ <outputFileNameMapping></outputFileNameMapping>
+ <unpack>true</unpack>
+ <scope>test</scope>
+ </dependencySet>
+ </dependencySets>
+ <fileSets>
+ <fileSet>
+ <directory>target/classes</directory>
+ <outputDirectory></outputDirectory>
+ </fileSet>
+ <fileSet>
+ <directory>target/test-classes</directory>
+ <outputDirectory></outputDirectory>
+ </fileSet>
+ </fileSets>
+</assembly>
diff --git a/java/integrationtests/pom.xml b/java/integrationtests/pom.xml
index fd7ac8b039..3afdf48204 100644
--- a/java/integrationtests/pom.xml
+++ b/java/integrationtests/pom.xml
@@ -103,6 +103,19 @@
</execution>
</executions>
</plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>2.2-SNAPSHOT</version>
+ <configuration>
+ <descriptors>
+ <descriptor>jar-with-dependencies.xml</descriptor>
+ </descriptors>
+ <outputDirectory>target</outputDirectory>
+ <workDirectory>target/assembly/work</workDirectory>
+ </configuration>
+ </plugin>
</plugins>
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/CoordinatingTestCase.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/CoordinatingTestCase.java
index 36c270ef11..ef69d14be8 100644
--- a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/CoordinatingTestCase.java
+++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/CoordinatingTestCase.java
@@ -22,14 +22,17 @@
package org.apache.qpid.interop.coordinator;
import java.util.Collection;
+import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.LinkedBlockingQueue;
-import javax.jms.JMSException;
-import javax.jms.Message;
+import javax.jms.*;
import junit.framework.TestCase;
-import org.apache.qpid.util.ConversationHelper;
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.util.ConversationFactory;
/**
* An CoordinatingTestCase is a JUnit test case extension that knows how to coordinate test clients that take part in a
@@ -60,19 +63,24 @@ import org.apache.qpid.util.ConversationHelper;
* <p><table id="crc"><caption>CRC Card</caption>
* <tr><th> Responsibilities <th> Collaborations
* <tr><td> Accept notification of test case participants. <td> {@link InvitingTestDecorator}
- * <tr><td> Coordinate the test sequence amongst participants. <td> {@link ConversationHelper}
+ * <tr><td> Accpet JMS Connection to carry out the coordination over.
+ * <tr><td> Coordinate the test sequence amongst participants. <td> {@link ConversationFactory}
* <tr><td> Supply test properties
* </table>
*/
public abstract class CoordinatingTestCase extends TestCase
{
+ /** Used for debugging. */
+ private static final Logger log = Logger.getLogger(CoordinatingTestCase.class);
+
/** Holds the contact details for the sending test client. */
TestClientDetails sender;
/** Holds the contact details for the receving test client. */
TestClientDetails receiver;
- ConversationHelper conversation;
+ /** Holds the conversation factory over which to coordinate the test. */
+ ConversationFactory conversationFactory;
/**
* Creates a new coordinating test case with the specified name.
@@ -91,6 +99,8 @@ public abstract class CoordinatingTestCase extends TestCase
*/
public void setSender(TestClientDetails sender)
{
+ log.debug("public void setSender(TestClientDetails sender = " + sender + "): called");
+
this.sender = sender;
}
@@ -101,6 +111,8 @@ public abstract class CoordinatingTestCase extends TestCase
*/
public void setReceiver(TestClientDetails receiver)
{
+ log.debug("public void setReceiver(TestClientDetails receiver = " + receiver + "): called");
+
this.receiver = receiver;
}
@@ -125,6 +137,46 @@ public abstract class CoordinatingTestCase extends TestCase
}
/**
+ * Returns the name of the current test method of this test class, with the sending and receiving client names
+ * appended on to it, so that the resulting name unqiuely identifies the test and the clients that participated
+ * in it.
+ *
+ * @return The unique test and client name.
+ */
+ public String getName()
+ {
+ if ((sender == null) || (receiver == null))
+ {
+ return super.getName();
+ }
+ else
+ {
+ return super.getName() + "_sender_" + sender.clientName + "_receiver_" + receiver.clientName;
+ }
+ }
+
+ /**
+ * Should provide a translation from the junit method name of a test to its test case name as defined in the
+ * interop testing specification. For example the method "testP2P" might map onto the interop test case name
+ * "TC2_BasicP2P".
+ *
+ * @param methodName The name of the JUnit test method.
+ *
+ * @return The name of the corresponding interop test case.
+ */
+ public abstract String getTestCaseNameForTestMethod(String methodName);
+
+ /**
+ * Accepts the conversation factory over which to hold the test coordinating conversation.
+ *
+ * @param conversationFactory The conversation factory to coordinate the test over.
+ */
+ public void setConversationFactory(ConversationFactory conversationFactory)
+ {
+ this.conversationFactory = conversationFactory;
+ }
+
+ /**
* Holds a test coordinating conversation with the test clients. This is the basic implementation of the inner
* loop of Use Case 5. It consists of assigning the test roles, begining the test and gathering the test reports
* from the participants.
@@ -135,44 +187,81 @@ public abstract class CoordinatingTestCase extends TestCase
*
* @throws JMSException All underlying JMSExceptions are allowed to fall through.
*/
- protected Message[] sequenceTest(Properties testProperties) throws JMSException
+ protected Message[] sequenceTest(Map<String, Object> testProperties) throws JMSException
{
+ log.debug("protected Message[] sequenceTest(Object... testProperties = " + testProperties + "): called");
+
+ Session session = conversationFactory.getSession();
+ Destination senderControlTopic = session.createTopic(sender.privateControlKey);
+ Destination receiverControlTopic = session.createTopic(receiver.privateControlKey);
+
+ ConversationFactory.Conversation senderConversation = conversationFactory.startConversation();
+ ConversationFactory.Conversation receiverConversation = conversationFactory.startConversation();
+
// Assign the sender role to the sending test client.
- Message assignSender = conversation.getSession().createMessage();
+ Message assignSender = conversationFactory.getSession().createMessage();
+ setPropertiesOnMessage(assignSender, testProperties);
assignSender.setStringProperty("CONTROL_TYPE", "ASSIGN_ROLE");
assignSender.setStringProperty("ROLE", "SENDER");
- conversation.send(assignSender);
+ senderConversation.send(senderControlTopic, assignSender);
// Assign the receiver role the receiving client.
- Message assignReceiver = conversation.getSession().createMessage();
+ Message assignReceiver = session.createMessage();
+ setPropertiesOnMessage(assignReceiver, testProperties);
assignReceiver.setStringProperty("CONTROL_TYPE", "ASSIGN_ROLE");
assignReceiver.setStringProperty("ROLE", "RECEIVER");
- conversation.send(assignReceiver);
+ receiverConversation.send(receiverControlTopic, assignReceiver);
// Wait for the senders and receivers to confirm their roles.
- conversation.receive();
- conversation.receive();
+ senderConversation.receive();
+ receiverConversation.receive();
// Start the test.
- Message start = conversation.getSession().createMessage();
+ Message start = session.createMessage();
start.setStringProperty("CONTROL_TYPE", "START");
- conversation.send(start);
+ senderConversation.send(senderControlTopic, start);
// Wait for the test sender to return its report.
- Message senderReport = conversation.receive();
+ Message senderReport = senderConversation.receive();
+
+ try
+ {
+ Thread.sleep(500);
+ }
+ catch (InterruptedException e)
+ { }
// Ask the receiver for its report.
- Message statusRequest = conversation.getSession().createMessage();
+ Message statusRequest = session.createMessage();
statusRequest.setStringProperty("CONTROL_TYPE", "STATUS_REQUEST");
- conversation.send(statusRequest);
+ receiverConversation.send(receiverControlTopic, statusRequest);
// Wait for the receiver to send its report.
- Message receiverReport = conversation.receive();
+ Message receiverReport = receiverConversation.receive();
return new Message[] { senderReport, receiverReport };
}
+
+ /**
+ * Sets properties of different types on a JMS Message.
+ *
+ * @param message The message to set properties on.
+ * @param properties The property name/value pairs to set.
+ *
+ * @throws JMSException All underlying JMSExceptions are allowed to fall through.
+ */
+ public void setPropertiesOnMessage(Message message, Map<String, Object> properties) throws JMSException
+ {
+ for (Map.Entry<String, Object> entry : properties.entrySet())
+ {
+ String name = entry.getKey();
+ Object value = entry.getValue();
+
+ message.setObjectProperty(name, value);
+ }
+ }
}
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/Coordinator.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/Coordinator.java
index 5e0f5b4941..de5faeac0f 100644
--- a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/Coordinator.java
+++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/Coordinator.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.interop.coordinator;
+import java.io.*;
import java.util.*;
import java.util.concurrent.LinkedBlockingQueue;
@@ -31,18 +32,18 @@ import junit.framework.TestSuite;
import org.apache.log4j.Logger;
+import org.apache.qpid.interop.coordinator.testcases.CoordinatingTestCase1DummyRun;
import org.apache.qpid.interop.coordinator.testcases.CoordinatingTestCase2BasicP2P;
-import org.apache.qpid.interop.testclient.InteropClientTestCase;
+import org.apache.qpid.interop.coordinator.testcases.CoordinatingTestCase3BasicPubSub;
import org.apache.qpid.interop.testclient.TestClient;
-import org.apache.qpid.interop.testclient.testcases.TestCase1DummyRun;
-import org.apache.qpid.interop.testclient.testcases.TestCase2BasicP2P;
-import org.apache.qpid.util.ClasspathScanner;
import org.apache.qpid.util.CommandLineParser;
-import org.apache.qpid.util.ConversationHelper;
+import org.apache.qpid.util.ConversationFactory;
import org.apache.qpid.util.PrettyPrintingUtils;
-import uk.co.thebadgerset.junit.extensions.TestRunnerImprovedErrorHandling;
+import uk.co.thebadgerset.junit.extensions.TKTestResult;
+import uk.co.thebadgerset.junit.extensions.TKTestRunner;
import uk.co.thebadgerset.junit.extensions.WrappedSuiteTestDecorator;
+import uk.co.thebadgerset.junit.extensions.util.TestContextProperties;
/**
* <p/>Implements the coordinator client described in the interop testing specification
@@ -51,13 +52,13 @@ import uk.co.thebadgerset.junit.extensions.WrappedSuiteTestDecorator;
*
* <p><table id="crc"><caption>CRC Card</caption>
* <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Find out what test clients are available. <td> {@link ConversationHelper}
+ * <tr><td> Find out what test clients are available. <td> {@link ConversationFactory}
* <tr><td> Decorate available tests to run all available clients. <td> {@link InvitingTestDecorator}
* <tr><td> Attach XML test result logger.
* <tr><td> Terminate the interop testing framework.
* </table>
*/
-public class Coordinator extends TestRunnerImprovedErrorHandling
+public class Coordinator extends TKTestRunner
{
private static final Logger log = Logger.getLogger(Coordinator.class);
@@ -73,10 +74,21 @@ public class Coordinator extends TestRunnerImprovedErrorHandling
Set<TestClientDetails> enlistedClients = new HashSet<TestClientDetails>();
/** Holds the conversation helper for the control conversation. */
- private ConversationHelper conversation;
+ private ConversationFactory conversationFactory;
+
+ /** Holds the connection that the coordinating messages are sent over. */
private Connection connection;
/**
+ * Holds the name of the class of the test currently being run. Ideally passed into the {@link #createTestResult}
+ * method, but as the signature is already fixed for this, the current value gets pushed here as a member variable.
+ */
+ private String currentTestClassName;
+
+ /** Holds the path of the directory to output test results too, if one is defined. */
+ private static String reportDir;
+
+ /**
* Creates an interop test coordinator on the specified broker and virtual host.
*
* @param brokerUrl The URL of the broker to connect to.
@@ -114,19 +126,26 @@ public class Coordinator extends TestRunnerImprovedErrorHandling
new String[][]
{
{ "b", "The broker URL.", "broker", "false" },
- { "h", "The virtual host to use.", "virtual host", "false" }
+ { "h", "The virtual host to use.", "virtual host", "false" },
+ { "o", "The name of the directory to output test timings to.", "dir", "false" }
}));
// Extract the command line options.
String brokerUrl = options.getProperty("b");
String virtualHost = options.getProperty("h");
+ reportDir = options.getProperty("o");
// Scan for available test cases using a classpath scanner.
Collection<Class<? extends CoordinatingTestCase>> testCaseClasses =
new ArrayList<Class<? extends CoordinatingTestCase>>();
// ClasspathScanner.getMatches(CoordinatingTestCase.class, "^Test.*", true);
// Hard code the test classes till the classpath scanner is fixed.
- Collections.addAll(testCaseClasses, new Class[] { CoordinatingTestCase2BasicP2P.class });
+ Collections.addAll(testCaseClasses,
+ new Class[]
+ {
+ CoordinatingTestCase1DummyRun.class, CoordinatingTestCase2BasicP2P.class,
+ CoordinatingTestCase3BasicPubSub.class
+ });
// Check that some test classes were actually found.
if ((testCaseClasses == null) || testCaseClasses.isEmpty())
@@ -145,9 +164,12 @@ public class Coordinator extends TestRunnerImprovedErrorHandling
// Create a coordinator and begin its test procedure.
Coordinator coordinator = new Coordinator(brokerUrl, virtualHost);
+
+ boolean failure = false;
+
TestResult testResult = coordinator.start(testClassNames);
- if (!testResult.wasSuccessful())
+ if (failure)
{
System.exit(FAILURE_EXIT);
}
@@ -176,7 +198,7 @@ public class Coordinator extends TestRunnerImprovedErrorHandling
public TestResult start(String[] testClassNames) throws Exception
{
log.debug("public TestResult start(String[] testClassNames = " + PrettyPrintingUtils.printArray(testClassNames)
- + "): called");
+ + ": called");
// Connect to the broker.
connection = TestClient.createConnection(DEFAULT_CONNECTION_PROPS_RESOURCE, brokerUrl, virtualHost);
@@ -185,28 +207,39 @@ public class Coordinator extends TestRunnerImprovedErrorHandling
Destination controlTopic = session.createTopic("iop.control");
Destination responseQueue = session.createQueue("coordinator");
- conversation = new ConversationHelper(connection, controlTopic, responseQueue, LinkedBlockingQueue.class);
+ conversationFactory = new ConversationFactory(connection, responseQueue, LinkedBlockingQueue.class);
+ ConversationFactory.Conversation conversation = conversationFactory.startConversation();
+
+ connection.start();
// Broadcast the compulsory invitation to find out what clients are available to test.
Message invite = session.createMessage();
invite.setStringProperty("CONTROL_TYPE", "INVITE");
invite.setJMSReplyTo(responseQueue);
- conversation.send(invite);
+ conversation.send(controlTopic, invite);
// Wait for a short time, to give test clients an opportunity to reply to the invitation.
- Collection<Message> enlists = conversation.receiveAll(0, 10000);
+ Collection<Message> enlists = conversation.receiveAll(0, 3000);
enlistedClients = extractEnlists(enlists);
- // Run all of the tests in the suite using JUnit.
- TestResult result = super.start(testClassNames);
+ // Run the test in the suite using JUnit.
+ TestResult result = null;
+
+ for (String testClassName : testClassNames)
+ {
+ // Record the current test class, so that the test results can be output to a file incorporating this name.
+ this.currentTestClassName = testClassName;
+
+ result = super.start(new String[] { testClassName });
+ }
// At this point in time, all tests have completed. Broadcast the shutdown message.
Message terminate = session.createMessage();
terminate.setStringProperty("CONTROL_TYPE", "TERMINATE");
- conversation.send(terminate);
+ conversation.send(controlTopic, terminate);
return result;
}
@@ -283,8 +316,69 @@ public class Coordinator extends TestRunnerImprovedErrorHandling
}
// Wrap the tests in an inviting test decorator, to perform the invite/test cycle.
- targetTest = new InvitingTestDecorator(targetTest, enlistedClients, conversation);
+ targetTest = new InvitingTestDecorator(targetTest, enlistedClients, conversationFactory, connection);
+
+ TestSuite suite = new TestSuite();
+ suite.addTest(targetTest);
+
+ // Wrap the tests in a scaled test decorator to them them as a 'batch' in one thread.
+ // targetTest = new ScaledTestDecorator(targetTest, new int[] { 1 });
- return super.doRun(targetTest, wait);
+ return super.doRun(suite, wait);
+ }
+
+ /**
+ * Creates the TestResult object to be used for test runs.
+ *
+ * @return An instance of the test result object.
+ */
+ protected TestResult createTestResult()
+ {
+ log.debug("protected TestResult createTestResult(): called");
+
+ TKTestResult result = new TKTestResult(fPrinter.getWriter(), delay, verbose, testCaseName);
+
+ // Check if a directory to output reports to has been specified and attach test listeners if so.
+ if (reportDir != null)
+ {
+ // Create the report directory if it does not already exist.
+ File reportDirFile = new File(reportDir);
+
+ if (!reportDirFile.exists())
+ {
+ reportDirFile.mkdir();
+ }
+
+ // Create the timings file (make the name of this configurable as a command line parameter).
+ Writer timingsWriter = null;
+
+ try
+ {
+ File timingsFile = new File(reportDirFile, "TEST." + currentTestClassName + ".xml");
+ timingsWriter = new BufferedWriter(new FileWriter(timingsFile), 20000);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException("Unable to create the log file to write test results to: " + e, e);
+ }
+
+ // Set up a CSV results listener to output the timings to the results file.
+ XMLTestListener listener = new XMLTestListener(timingsWriter, currentTestClassName);
+ result.addListener(listener);
+ result.addTKTestListener(listener);
+
+ // Register the results listeners shutdown hook to flush its data if the test framework is shutdown
+ // prematurely.
+ // registerShutdownHook(listener);
+
+ // Record the start time of the batch.
+ // result.notifyStartBatch();
+
+ // At this point in time the test class has been instantiated, giving it an opportunity to read its parameters.
+ // Inform any test listers of the test properties.
+ result.notifyTestProperties(TestContextProperties.getAccessedProps());
+ }
+
+ return result;
}
}
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/InvitingTestDecorator.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/InvitingTestDecorator.java
index 2975082631..858ed1a589 100644
--- a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/InvitingTestDecorator.java
+++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/InvitingTestDecorator.java
@@ -23,6 +23,8 @@ package org.apache.qpid.interop.coordinator;
import java.util.*;
import java.util.concurrent.LinkedBlockingQueue;
+import javax.jms.Connection;
+import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
@@ -32,14 +34,14 @@ import junit.framework.TestSuite;
import org.apache.log4j.Logger;
-import org.apache.qpid.util.ConversationHelper;
+import org.apache.qpid.util.ConversationFactory;
import uk.co.thebadgerset.junit.extensions.WrappedSuiteTestDecorator;
/**
* <p><table id="crc"><caption>CRC Card</caption>
* <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Broadcast test invitations and collect enlists. <td> {@link ConversationHelper}.
+ * <tr><td> Broadcast test invitations and collect enlists. <td> {@link ConversationFactory}.
* <tr><td> Output test failures for clients unwilling to run the test case. <td> {@link Coordinator}
* <tr><td> Execute coordinated test cases. <td> {@link CoordinatingTestCase}
* </table>
@@ -52,7 +54,10 @@ public class InvitingTestDecorator extends WrappedSuiteTestDecorator
Set<TestClientDetails> allClients;
/** Holds the conversation helper for the control level conversation for coordinating the test through. */
- ConversationHelper conversation;
+ ConversationFactory conversationFactory;
+
+ /** Holds the connection that the control conversation is held over. */
+ Connection connection;
/** Holds the underlying {@link CoordinatingTestCase}s that this decorator wraps. */
WrappedSuiteTestDecorator testSuite;
@@ -61,11 +66,12 @@ public class InvitingTestDecorator extends WrappedSuiteTestDecorator
* Creates a wrapped suite test decorator from another one.
*
* @param suite The test suite.
- * @param availableClients The list of all clients that responded to the compulsory invite.
+ * @param availableClients The list of all clients that responded to the compulsory invite.
* @param controlConversation The conversation helper for the control level, test coordination conversation.
+ * @param controlConnection The connection that the coordination messages are sent over.
*/
public InvitingTestDecorator(WrappedSuiteTestDecorator suite, Set<TestClientDetails> availableClients,
- ConversationHelper controlConversation)
+ ConversationFactory controlConversation, Connection controlConnection)
{
super(suite);
@@ -74,7 +80,8 @@ public class InvitingTestDecorator extends WrappedSuiteTestDecorator
testSuite = suite;
allClients = availableClients;
- conversation = controlConversation;
+ conversationFactory = controlConversation;
+ connection = controlConnection;
}
/**
@@ -103,14 +110,17 @@ public class InvitingTestDecorator extends WrappedSuiteTestDecorator
Set<TestClientDetails> enlists = null;
try
{
- Message invite = conversation.getSession().createMessage();
+ Message invite = conversationFactory.getSession().createMessage();
+ Destination controlTopic = conversationFactory.getSession().createTopic("iop.control");
+ ConversationFactory.Conversation conversation = conversationFactory.startConversation();
+
invite.setStringProperty("CONTROL_TYPE", "INVITE");
- invite.setStringProperty("TEST_NAME", coordTest.getName());
+ invite.setStringProperty("TEST_NAME", coordTest.getTestCaseNameForTestMethod(coordTest.getName()));
- conversation.send(invite);
+ conversation.send(controlTopic, invite);
// Wait for a short time, to give test clients an opportunity to reply to the invitation.
- Collection<Message> replies = conversation.receiveAll(allClients.size(), 10000);
+ Collection<Message> replies = conversation.receiveAll(allClients.size(), 3000);
enlists = Coordinator.extractEnlists(replies);
}
catch (JMSException e)
@@ -143,6 +153,9 @@ public class InvitingTestDecorator extends WrappedSuiteTestDecorator
coordTest.setSender(enlistedPair.get(0));
coordTest.setReceiver(enlistedPair.get(1));
+ // Pass down the connection to hold the coordination conversation over.
+ coordTest.setConversationFactory(conversationFactory);
+
// Execute the test case.
coordTest.run(testResult);
}
@@ -150,6 +163,16 @@ public class InvitingTestDecorator extends WrappedSuiteTestDecorator
}
/**
+ * Prints a string summarizing this test decorator, mainly for debugging purposes.
+ *
+ * @return String representation for debugging purposes.
+ */
+ public String toString()
+ {
+ return "InvitingTestDecorator: [ testSuite = " + testSuite + " ]";
+ }
+
+ /**
* Produces all pairs of combinations of elements from two sets. The ordering of the elements in the pair is
* important, that is the pair <l, r> is distinct from <r, l>; both pairs are generated. For any element, i, in
* both the left and right sets, the reflexive pair <i, i> is not generated.
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/OptOutTestCase.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/OptOutTestCase.java
index 27e2c42a9a..42a382a898 100644
--- a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/OptOutTestCase.java
+++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/OptOutTestCase.java
@@ -49,4 +49,17 @@ public class OptOutTestCase extends CoordinatingTestCase
{
Assert.fail("One of " + getSender() + " and " + getReceiver() + " opted out of the test.");
}
+
+ /**
+ * Should provide a translation from the junit method name of a test to its test case name as defined in the
+ * interop testing specification. For example the method "testP2P" might map onto the interop test case name
+ * "TC2_BasicP2P".
+ *
+ * @param methodName The name of the JUnit test method.
+ * @return The name of the corresponding interop test case.
+ */
+ public String getTestCaseNameForTestMethod(String methodName)
+ {
+ return "OptOutTest";
+ }
}
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/TestClientDetails.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/TestClientDetails.java
index 80abd4d4c9..c4a9d39cd8 100644
--- a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/TestClientDetails.java
+++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/TestClientDetails.java
@@ -82,6 +82,6 @@ public class TestClientDetails
*/
public String toString()
{
- return "clientName = " + clientName + ", privateControlKey = " + privateControlKey;
+ return "TestClientDetails: [ clientName = " + clientName + ", privateControlKey = " + privateControlKey + " ]";
}
}
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/XMLTestListener.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/XMLTestListener.java
new file mode 100644
index 0000000000..6c7a2a5d52
--- /dev/null
+++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/XMLTestListener.java
@@ -0,0 +1,381 @@
+package org.apache.qpid.interop.coordinator;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.Writer;
+import java.util.*;
+
+import junit.framework.AssertionFailedError;
+import junit.framework.Test;
+import junit.framework.TestCase;
+
+import org.apache.log4j.Logger;
+
+import uk.co.thebadgerset.junit.extensions.listeners.TKTestListener;
+
+/**
+ * Listens for test results for a named test and outputs these in the standard JUnit XML format to the specified
+ * writer.
+ *
+ * <p/>The API for this listener accepts notifications about different aspects of a tests results through different
+ * methods, so some assumption needs to be made as to which test result a notification refers to. For example
+ * {@link #startTest} will be called, then possibly {@link #timing} will be called, even though the test instance is
+ * passed in both cases, it is not enough to distinguish a particular run of the test, as the test case instance may
+ * be being shared between multiple threads, or being run a repeated number of times, and can therfore be re-used
+ * between calls. The listeners make the assumption that, for every test, a unique thread will call {@link #startTest}
+ * and {@link #endTest} to delimit each test. All calls to set test parameters, timings, state and so on, will occur
+ * between the start and end and will be given with the same thread id as the start and end, so the thread id provides
+ * a unqiue value to identify a particular test run against.
+ *
+ * <p><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * </table>
+ *
+ * @todo Merge this class with CSV test listener, making the collection of results common to both, and only factoring
+ * out the results printing code into sub-classes. Provide a simple XML results formatter with the same format as
+ * the ant XML formatter, and a more structured one for outputing results with timings and summaries from
+ * performance tests.
+ */
+public class XMLTestListener implements TKTestListener
+{
+ /** Used for debugging. */
+ private static final Logger log = Logger.getLogger(XMLTestListener.class);
+
+ /** The results file writer. */
+ protected Writer writer;
+
+ /** Holds the results for individual tests. */
+ // protected Map<Result, Result> results = new LinkedHashMap<Result, Result>();
+ // protected List<Result> results = new ArrayList<Result>();
+
+ /**
+ * Map for holding results on a per thread basis as they come in. A ThreadLocal is not used as sometimes an
+ * explicit thread id must be used, where notifications come from different threads than the ones that called
+ * the test method.
+ */
+ Map<Long, Result> threadLocalResults = Collections.synchronizedMap(new LinkedHashMap<Long, Result>());
+
+ /**
+ * Holds results for tests that have ended. Transferring these results here from the per-thread results map, means
+ * that the thread id is freed for the thread to generate more results.
+ */
+ List<Result> results = new ArrayList<Result>();
+
+ /** Holds the overall error count. */
+ protected int errors = 0;
+
+ /** Holds the overall failure count. */
+ protected int failures = 0;
+
+ /** Holds the overall tests run count. */
+ protected int runs = 0;
+
+ /** Holds the name of the class that tests are being run for. */
+ String testClassName;
+
+ /**
+ * Creates a new XML results output listener that writes to the specified location.
+ *
+ * @param writer The location to write results to.
+ */
+ public XMLTestListener(Writer writer, String testClassName)
+ {
+ log.debug("public XMLTestListener(Writer writer, String testClassName = " + testClassName + "): called");
+
+ this.writer = writer;
+ this.testClassName = testClassName;
+ }
+
+ /**
+ * Resets the test results to the default state of time zero, memory usage zero, parameter zero, test passed.
+ *
+ * @param test The test to resest any results for.
+ * @param threadId Optional thread id if not calling from thread that started the test method. May be null.
+ */
+ public void reset(Test test, Long threadId)
+ {
+ log.debug("public void reset(Test test = " + test + ", Long threadId = " + threadId + "): called");
+
+ XMLTestListener.Result r =
+ (threadId == null) ? threadLocalResults.get(Thread.currentThread().getId()) : threadLocalResults.get(threadId);
+
+ r.error = null;
+ r.failure = null;
+
+ }
+
+ /**
+ * A test started.
+ */
+ public void startTest(Test test)
+ {
+ log.debug("public void startTest(Test test = " + test + "): called");
+
+ Result newResult = new Result(test.getClass().getName(), ((TestCase) test).getName());
+
+ // Initialize the thread local test results.
+ threadLocalResults.put(Thread.currentThread().getId(), newResult);
+ runs++;
+ }
+
+ /**
+ * Should be called every time a test completes with the run time of that test.
+ *
+ * @param test The name of the test.
+ * @param nanos The run time of the test in nanoseconds.
+ * @param threadId Optional thread id if not calling from thread that started the test method. May be null.
+ */
+ public void timing(Test test, long nanos, Long threadId)
+ { }
+
+ /**
+ * Should be called every time a test completed with the amount of memory used before and after the test was run.
+ *
+ * @param test The test which memory was measured for.
+ * @param memStart The total JVM memory used before the test was run.
+ * @param memEnd The total JVM memory used after the test was run.
+ * @param threadId Optional thread id if not calling from thread that started the test method. May be null.
+ */
+ public void memoryUsed(Test test, long memStart, long memEnd, Long threadId)
+ { }
+
+ /**
+ * Should be called every time a parameterized test completed with the int value of its test parameter.
+ *
+ * @param test The test which memory was measured for.
+ * @param parameter The int parameter value.
+ * @param threadId Optional thread id if not calling from thread that started the test method. May be null.
+ */
+ public void parameterValue(Test test, int parameter, Long threadId)
+ { }
+
+ /**
+ * Should be called every time a test completes with the current number of test threads running.
+ *
+ * @param test The test for which the measurement is being generated.
+ * @param threads The number of tests being run concurrently.
+ * @param threadId Optional thread id if not calling from thread that started the test method. May be null.
+ */
+ public void concurrencyLevel(Test test, int threads, Long threadId)
+ { }
+
+ /**
+ * Notifies listeners of the tests read/set properties.
+ *
+ * @param properties The tests read/set properties.
+ */
+ public void properties(Properties properties)
+ { }
+
+ /**
+ * A test ended.
+ */
+ public void endTest(Test test)
+ {
+ log.debug("public void endTest(Test test = " + test + "): called");
+
+ // Move complete test results into the completed tests list.
+ Result r = threadLocalResults.get(Thread.currentThread().getId());
+ results.add(r);
+
+ // Clear all the test results for the thread.
+ threadLocalResults.remove(Thread.currentThread().getId());
+ }
+
+ /**
+ * Called when a test completes. Success, failure and errors. This method should be used when registering an
+ * end test from a different thread than the one that started the test.
+ *
+ * @param test The test which completed.
+ * @param threadId Optional thread id if not calling from thread that started the test method. May be null.
+ */
+ public void endTest(Test test, Long threadId)
+ {
+ log.debug("public void endTest(Test test = " + test + ", Long threadId = " + threadId + "): called");
+
+ // Move complete test results into the completed tests list.
+ Result r =
+ (threadId == null) ? threadLocalResults.get(Thread.currentThread().getId()) : threadLocalResults.get(threadId);
+ results.add(r);
+
+ // Clear all the test results for the thread.
+ threadLocalResults.remove(Thread.currentThread().getId());
+ }
+
+ /**
+ * An error occurred.
+ */
+ public void addError(Test test, Throwable t)
+ {
+ log.debug("public void addError(Test test = " + test + ", Throwable t = " + t + "): called");
+
+ Result r = threadLocalResults.get(Thread.currentThread().getId());
+ r.error = t;
+ errors++;
+ }
+
+ /**
+ * A failure occurred.
+ */
+ public void addFailure(Test test, AssertionFailedError t)
+ {
+ log.debug("public void addFailure(Test test = " + test + ", AssertionFailedError t = " + t + "): called");
+
+ Result r = threadLocalResults.get(Thread.currentThread().getId());
+ r.failure = t;
+ failures++;
+ }
+
+ /**
+ * Called when a test completes to mark it as a test fail. This method should be used when registering a
+ * failure from a different thread than the one that started the test.
+ *
+ * @param test The test which failed.
+ * @param e The assertion that failed the test.
+ * @param threadId Optional thread id if not calling from thread that started the test method. May be null.
+ */
+ public void addFailure(Test test, AssertionFailedError e, Long threadId)
+ {
+ log.debug("public void addFailure(Test test, AssertionFailedError e, Long threadId): called");
+
+ Result r =
+ (threadId == null) ? threadLocalResults.get(Thread.currentThread().getId()) : threadLocalResults.get(threadId);
+ r.failure = e;
+ failures++;
+ }
+
+ /**
+ * Notifies listeners of the start of a complete run of tests.
+ */
+ public void startBatch()
+ {
+ log.debug("public void startBatch(): called");
+
+ // Reset all results counts.
+ threadLocalResults = Collections.synchronizedMap(new HashMap<Long, Result>());
+ errors = 0;
+ failures = 0;
+ runs = 0;
+
+ // Write out the file header.
+ try
+ {
+ writer.write("<?xml version=\"1.0\" ?>\n");
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException("Unable to write the test results.", e);
+ }
+ }
+
+ /**
+ * Notifies listeners of the end of a complete run of tests.
+ *
+ * @param parameters The optional test parameters to log out with the batch results.
+ */
+ public void endBatch(Properties parameters)
+ {
+ log.debug("public void endBatch(Properties parameters = " + parameters + "): called");
+
+ // Write out the results.
+ try
+ {
+ // writer.write("<?xml version=\"1.0\" ?>\n");
+ writer.write("<testsuite errors=\"" + errors + "\" failures=\"" + failures + "\" tests=\"" + runs + "\" name=\""
+ + testClassName + "\">\n");
+
+ for (Result result : results)
+ {
+ writer.write(" <testcase classname=\"" + result.testClass + "\" name=\"" + result.testName + "\">\n");
+
+ if (result.error != null)
+ {
+ writer.write(" <error type=\"" + result.error.getClass() + "\">");
+ result.error.printStackTrace(new PrintWriter(writer));
+ writer.write(" </error>");
+ }
+ else if (result.failure != null)
+ {
+ writer.write(" <failure type=\"" + result.failure.getClass() + "\">");
+ result.failure.printStackTrace(new PrintWriter(writer));
+ writer.write(" </failure>");
+ }
+
+ writer.write(" </testcase>\n");
+ }
+
+ writer.write("</testsuite>\n");
+ writer.flush();
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException("Unable to write the test results.", e);
+ }
+ }
+
+ /**
+ * Used to capture the results of a particular test run.
+ */
+ protected static class Result
+ {
+ public Result(String testClass, String testName)
+ {
+ this.testClass = testClass;
+ this.testName = testName;
+ }
+
+ public String testClass;
+ public String testName;
+
+ /** Holds the exception that caused error in this test. */
+ public Throwable error;
+
+ /** Holds the assertion exception that caused failure in this test. */
+ public AssertionFailedError failure;
+
+ /** Holds the error count for this test. */
+ // public int errors = 0;
+
+ /** Holds the failure count for this tests. */
+ // public int failures = 0;
+
+ /** Holds the overall tests run count for this test. */
+ // public int runs = 0;
+
+ /*public boolean equals(Object o)
+ {
+ if (this == o)
+ {
+ return true;
+ }
+
+ if (!(o instanceof Result))
+ {
+ return false;
+ }
+
+ final Result result = (Result) o;
+
+ if ((testClass != null) ? (!testClass.equals(result.testClass)) : (result.testClass != null))
+ {
+ return false;
+ }
+
+ if ((testName != null) ? (!testName.equals(result.testName)) : (result.testName != null))
+ {
+ return false;
+ }
+
+ return true;
+ }
+
+ public int hashCode()
+ {
+ int result;
+ result = ((testClass != null) ? testClass.hashCode() : 0);
+ result = (29 * result) + ((testName != null) ? testName.hashCode() : 0);
+
+ return result;
+ }*/
+ }
+}
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/testcases/CoordinatingTestCase1DummyRun.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/testcases/CoordinatingTestCase1DummyRun.java
new file mode 100644
index 0000000000..e20f702dd1
--- /dev/null
+++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/testcases/CoordinatingTestCase1DummyRun.java
@@ -0,0 +1,64 @@
+package org.apache.qpid.interop.coordinator.testcases;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.Message;
+
+import junit.framework.Assert;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.interop.coordinator.CoordinatingTestCase;
+
+/**
+ * <p><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Exercises the interop testing framework without actually sending any test messages.
+ * <td> {@link org.apache.qpid.interop.coordinator.CoordinatingTestCase}
+ * </table>
+ */
+public class CoordinatingTestCase1DummyRun extends CoordinatingTestCase
+{
+ /** Used for debugging. */
+ private static final Logger log = Logger.getLogger(CoordinatingTestCase1DummyRun.class);
+
+ /**
+ * Creates a new coordinating test case with the specified name.
+ *
+ * @param name The test case name.
+ */
+ public CoordinatingTestCase1DummyRun(String name)
+ {
+ super(name);
+ }
+
+ /**
+ * Performs the basic P2P test case, "Test Case 2" in the specification.
+ */
+ public void testDummyRun() throws Exception
+ {
+ log.debug("public void testDummyRun(): called");
+
+ Map<String, Object> testConfig = new HashMap<String, Object>();
+ testConfig.put("TEST_NAME", "TC1_DummyRun");
+
+ Message[] reports = sequenceTest(testConfig);
+
+ // Compare sender and receiver reports.
+ Assert.assertEquals("Expected to get 2 dummy reports.", 2, reports.length);
+ }
+
+ /**
+ * Should provide a translation from the junit method name of a test to its test case name as defined in the
+ * interop testing specification. For example the method "testP2P" might map onto the interop test case name
+ * "TC2_BasicP2P".
+ *
+ * @param methodName The name of the JUnit test method.
+ * @return The name of the corresponding interop test case.
+ */
+ public String getTestCaseNameForTestMethod(String methodName)
+ {
+ return "TC1_DummyRun";
+ }
+}
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/testcases/CoordinatingTestCase2BasicP2P.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/testcases/CoordinatingTestCase2BasicP2P.java
index 4fb9729c4a..d66d42dd51 100644
--- a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/testcases/CoordinatingTestCase2BasicP2P.java
+++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/testcases/CoordinatingTestCase2BasicP2P.java
@@ -1,11 +1,14 @@
package org.apache.qpid.interop.coordinator.testcases;
-import java.util.Properties;
+import java.util.HashMap;
+import java.util.Map;
import javax.jms.Message;
import junit.framework.Assert;
+import org.apache.log4j.Logger;
+
import org.apache.qpid.interop.coordinator.CoordinatingTestCase;
/**
@@ -16,6 +19,9 @@ import org.apache.qpid.interop.coordinator.CoordinatingTestCase;
*/
public class CoordinatingTestCase2BasicP2P extends CoordinatingTestCase
{
+ /** Used for debugging. */
+ private static final Logger log = Logger.getLogger(CoordinatingTestCase2BasicP2P.class);
+
/**
* Creates a new coordinating test case with the specified name.
*
@@ -31,10 +37,12 @@ public class CoordinatingTestCase2BasicP2P extends CoordinatingTestCase
*/
public void testBasicP2P() throws Exception
{
- Properties testConfig = new Properties();
- testConfig.setProperty("TEST_CASE", "TC2_BasicP2P");
- testConfig.setProperty("P2P_QUEUE_AND_KEY_NAME", "tc2queue");
- testConfig.setProperty("P2P_NUM_MESSAGES", "50");
+ log.debug("public void testBasicP2P(): called");
+
+ Map<String, Object> testConfig = new HashMap<String, Object>();
+ testConfig.put("TEST_NAME", "TC2_BasicP2P");
+ testConfig.put("P2P_QUEUE_AND_KEY_NAME", "tc2queue");
+ testConfig.put("P2P_NUM_MESSAGES", 50);
Message[] reports = sequenceTest(testConfig);
@@ -45,4 +53,17 @@ public class CoordinatingTestCase2BasicP2P extends CoordinatingTestCase
Assert.assertEquals("The requested number of messages were not sent.", 50, messagesSent);
Assert.assertEquals("Sender and receiver messages sent did not match up.", messagesSent, messagesReceived);
}
+
+ /**
+ * Should provide a translation from the junit method name of a test to its test case name as defined in the
+ * interop testing specification. For example the method "testP2P" might map onto the interop test case name
+ * "TC2_BasicP2P".
+ *
+ * @param methodName The name of the JUnit test method.
+ * @return The name of the corresponding interop test case.
+ */
+ public String getTestCaseNameForTestMethod(String methodName)
+ {
+ return "TC2_BasicP2P";
+ }
}
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/testcases/CoordinatingTestCase3BasicPubSub.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/testcases/CoordinatingTestCase3BasicPubSub.java
new file mode 100644
index 0000000000..ce46bb87ba
--- /dev/null
+++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/testcases/CoordinatingTestCase3BasicPubSub.java
@@ -0,0 +1,71 @@
+package org.apache.qpid.interop.coordinator.testcases;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.Message;
+
+import junit.framework.Assert;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.interop.coordinator.CoordinatingTestCase;
+
+/**
+ * <p><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Setup pub/sub test parameters and compare with test output. <td> {@link CoordinatingTestCase}
+ * </table>
+ */
+public class CoordinatingTestCase3BasicPubSub extends CoordinatingTestCase
+{
+ /** Used for debugging. */
+ private static final Logger log = Logger.getLogger(CoordinatingTestCase3BasicPubSub.class);
+
+ /**
+ * Creates a new coordinating test case with the specified name.
+ *
+ * @param name The test case name.
+ */
+ public CoordinatingTestCase3BasicPubSub(String name)
+ {
+ super(name);
+ }
+
+ /**
+ * Performs the basic P2P test case, "Test Case 2" in the specification.
+ */
+ public void testBasicPubSub() throws Exception
+ {
+ log.debug("public void testBasicPubSub(): called");
+
+ Map<String, Object> testConfig = new HashMap<String, Object>();
+ testConfig.put("TEST_NAME", "TC3_BasicPubSub");
+ testConfig.put("PUBSUB_KEY", "tc3route");
+ testConfig.put("PUBSUB_NUM_MESSAGES", 10);
+ testConfig.put("PUBSUB_NUM_RECEIVERS", 5);
+
+ Message[] reports = sequenceTest(testConfig);
+
+ // Compare sender and receiver reports.
+ int messagesSent = reports[0].getIntProperty("MESSAGE_COUNT");
+ int messagesReceived = reports[1].getIntProperty("MESSAGE_COUNT");
+
+ Assert.assertEquals("The requested number of messages were not sent.", 10, messagesSent);
+ Assert.assertEquals("Received messages did not match up to num sent * num receivers.", messagesSent * 5,
+ messagesReceived);
+ }
+
+ /**
+ * Should provide a translation from the junit method name of a test to its test case name as defined in the
+ * interop testing specification. For example the method "testP2P" might map onto the interop test case name
+ * "TC2_BasicP2P".
+ *
+ * @param methodName The name of the JUnit test method.
+ * @return The name of the corresponding interop test case.
+ */
+ public String getTestCaseNameForTestMethod(String methodName)
+ {
+ return "TC3_BasicPubSub";
+ }
+}
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/InteropClientTestCase.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/InteropClientTestCase.java
index 57726285f9..9f769822de 100644
--- a/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/InteropClientTestCase.java
+++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/InteropClientTestCase.java
@@ -79,8 +79,10 @@ public interface InteropClientTestCase extends MessageListener
/**
* Performs the test case actions.
+ *
+ * @throws JMSException Any JMSException resulting from reading the message are allowed to fall through.
*/
- public void start();
+ public void start() throws JMSException;
/**
* Gets a report on the actions performed by the test case in its assigned role.
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java
index 6d7ab9c978..0b9c72e1b6 100644
--- a/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java
+++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java
@@ -21,10 +21,7 @@
package org.apache.qpid.interop.testclient;
import java.io.IOException;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
+import java.util.*;
import javax.jms.*;
import javax.naming.Context;
@@ -33,6 +30,9 @@ import javax.naming.NamingException;
import org.apache.log4j.Logger;
+import org.apache.qpid.interop.testclient.testcases.TestCase1DummyRun;
+import org.apache.qpid.interop.testclient.testcases.TestCase2BasicP2P;
+import org.apache.qpid.interop.testclient.testcases.TestCase3BasicPubSub;
import org.apache.qpid.util.ClasspathScanner;
import org.apache.qpid.util.CommandLineParser;
import org.apache.qpid.util.PropertiesUtils;
@@ -61,41 +61,53 @@ public class TestClient implements MessageListener
{
private static Logger log = Logger.getLogger(TestClient.class);
+ public static final String CONNECTION_PROPERTY = "connectionfactory.broker";
+ public static final String CONNECTION_NAME = "broker";
+ public static final String CLIENT_NAME = "java";
+ public static final String DEFAULT_CONNECTION_PROPS_RESOURCE = "org/apache/qpid/interop/connection.properties";
+
/** Holds the URL of the broker to run the tests on. */
- String brokerUrl;
+ public static String brokerUrl;
/** Holds the virtual host to run the tests on. If <tt>null</tt>, then the default virtual host is used. */
- String virtualHost;
+ public static String virtualHost;
/** Holds all the test cases loaded from the classpath. */
Map<String, InteropClientTestCase> testCases = new HashMap<String, InteropClientTestCase>();
InteropClientTestCase currentTestCase;
- public static final String CONNECTION_PROPERTY = "connectionfactory.broker";
- public static final String CONNECTION_NAME = "broker";
- public static final String CLIENT_NAME = "java";
- public static final String DEFAULT_CONNECTION_PROPS_RESOURCE = "org/apache/qpid/interop/connection.properties";
-
private MessageProducer producer;
private Session session;
- public TestClient(String brokerUrl, String virtualHost)
+ private String clientName = CLIENT_NAME;
+
+ /**
+ * Creates a new interop test client, listenting to the specified broker and virtual host, with the specified
+ * client identifying name.
+ *
+ * @param brokerUrl The url of the broker to connect to.
+ * @param virtualHost The virtual host to conect to.
+ * @param clientName The client name to use.
+ */
+ public TestClient(String brokerUrl, String virtualHost, String clientName)
{
log.debug("public TestClient(String brokerUrl = " + brokerUrl + ", String virtualHost = " + virtualHost
- + "): called");
+ + ", String clientName = " + clientName + "): called");
// Retain the connection parameters.
this.brokerUrl = brokerUrl;
this.virtualHost = virtualHost;
+ this.clientName = clientName;
}
/**
* The entry point for the interop test coordinator. This client accepts the following command line arguments:
*
* <p/><table>
- * <tr><td> -b <td> The broker URL. <td> Mandatory.
- * <tr><td> -h <td> The virtual host. <td> Optional.
+ * <tr><td> -b <td> The broker URL. <td> Optional.
+ * <tr><td> -h <td> The virtual host. <td> Optional.
+ * <tr><td> -n <td> The test client name. <td> Optional.
* <tr><td> name=value <td> Trailing argument define name/value pairs. Added to system properties. <td> Optional.
* </table>
*
@@ -105,11 +117,13 @@ public class TestClient implements MessageListener
{
// Use the command line parser to evaluate the command line.
CommandLineParser commandLine =
- new CommandLineParser(new String[][]
- {
- { "b", "The broker URL.", "broker", "false" },
- { "h", "The virtual host to use.", "virtual host", "false" }
- });
+ new CommandLineParser(
+ new String[][]
+ {
+ { "b", "The broker URL.", "broker", "false" },
+ { "h", "The virtual host to use.", "virtual host", "false" },
+ { "n", "The test client name.", "name", "false" }
+ });
// Capture the command line arguments or display errors and correct usage and then exit.
Properties options = null;
@@ -128,13 +142,14 @@ public class TestClient implements MessageListener
// Extract the command line options.
String brokerUrl = options.getProperty("b");
String virtualHost = options.getProperty("h");
+ String clientName = options.getProperty("n");
// Add all the trailing command line options (name=value pairs) to system properties. Tests may pick up
// overridden values from there.
commandLine.addCommandLineToSysProperties();
// Create a test client and start it running.
- TestClient client = new TestClient(brokerUrl, virtualHost);
+ TestClient client = new TestClient(brokerUrl, virtualHost, (clientName == null) ? CLIENT_NAME : clientName);
try
{
@@ -147,13 +162,22 @@ public class TestClient implements MessageListener
}
}
+ /**
+ * Starts the interop test client running. This causes it to start listening for incoming test invites.
+ *
+ * @throws JMSException Any underlying JMSExceptions are allowed to fall through.
+ */
private void start() throws JMSException
{
log.debug("private void start(): called");
// Use a class path scanner to find all the interop test case implementations.
Collection<Class<? extends InteropClientTestCase>> testCaseClasses =
- ClasspathScanner.getMatches(InteropClientTestCase.class, "^TestCase.*", true);
+ new ArrayList<Class<? extends InteropClientTestCase>>();
+ // ClasspathScanner.getMatches(InteropClientTestCase.class, "^TestCase.*", true);
+ // Hard code the test classes till the classpath scanner is fixed.
+ Collections.addAll(testCaseClasses,
+ new Class[] { TestCase1DummyRun.class, TestCase2BasicP2P.class, TestCase3BasicPubSub.class });
// Create all the test case implementations and index them by the test names.
for (Class<? extends InteropClientTestCase> nextClass : testCaseClasses)
@@ -181,9 +205,12 @@ public class TestClient implements MessageListener
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Set this up to listen for control messages.
- MessageConsumer consumer = session.createConsumer(session.createTopic("iop.control." + CLIENT_NAME));
+ MessageConsumer consumer = session.createConsumer(session.createTopic("iop.control." + clientName));
consumer.setMessageListener(this);
+ MessageConsumer consumer2 = session.createConsumer(session.createTopic("iop.control"));
+ consumer2.setMessageListener(this);
+
// Create a producer to send replies with.
producer = session.createProducer(null);
@@ -209,13 +236,13 @@ public class TestClient implements MessageListener
public static Connection createConnection(String connectionPropsResource, String brokerUrl, String virtualHost)
{
log.debug("public static Connection createConnection(String connectionPropsResource = " + connectionPropsResource
- + ", String brokerUrl = " + brokerUrl + ", String virtualHost = " + virtualHost + "): called");
+ + ", String brokerUrl = " + brokerUrl + ", String virtualHost = " + virtualHost + "): called");
try
{
Properties connectionProps =
PropertiesUtils.getProperties(TestClient.class.getClassLoader().getResourceAsStream(
- connectionPropsResource));
+ connectionPropsResource));
if (brokerUrl != null)
{
@@ -270,6 +297,8 @@ public class TestClient implements MessageListener
if (testCaseName != null)
{
+ log.debug("Got an invite to test: " + testCaseName);
+
// Check if the requested test case is available.
InteropClientTestCase testCase = testCases.get(testCaseName);
@@ -282,6 +311,8 @@ public class TestClient implements MessageListener
}
else
{
+ log.debug("Got a compulsory invite.");
+
enlist = true;
}
@@ -290,8 +321,8 @@ public class TestClient implements MessageListener
// Reply with the client name in an Enlist message.
Message enlistMessage = session.createMessage();
enlistMessage.setStringProperty("CONTROL_TYPE", "ENLIST");
- enlistMessage.setStringProperty("CLIENT_NAME", CLIENT_NAME);
- enlistMessage.setStringProperty("CLIENT_PRIVATE_CONTROL_KEY", "iop.control." + CLIENT_NAME);
+ enlistMessage.setStringProperty("CLIENT_NAME", clientName);
+ enlistMessage.setStringProperty("CLIENT_PRIVATE_CONTROL_KEY", "iop.control." + clientName);
enlistMessage.setJMSCorrelationID(message.getJMSCorrelationID());
producer.send(message.getJMSReplyTo(), enlistMessage);
@@ -300,7 +331,10 @@ public class TestClient implements MessageListener
else if ("ASSIGN_ROLE".equals(controlType))
{
// Assign the role to the current test case.
- String roleName = message.getStringProperty("");
+ String roleName = message.getStringProperty("ROLE");
+
+ log.debug("Got a role assignment to role: " + roleName);
+
InteropClientTestCase.Roles role = Enum.valueOf(InteropClientTestCase.Roles.class, roleName);
currentTestCase.assignRole(role, message);
@@ -316,9 +350,15 @@ public class TestClient implements MessageListener
{
if ("START".equals(controlType))
{
+ log.debug("Got a start notification.");
+
// Start the current test case.
currentTestCase.start();
}
+ else
+ {
+ log.debug("Got a status request.");
+ }
// Generate the report from the test case and reply with it as a Report message.
Message reportMessage = currentTestCase.getReport(session);
@@ -327,10 +367,17 @@ public class TestClient implements MessageListener
producer.send(message.getJMSReplyTo(), reportMessage);
}
+ else if ("TERMINATE".equals(controlType))
+ {
+ System.out.println("Received termination instruction from coordinator.");
+
+ // Is a cleaner shutdown needed?
+ System.exit(0);
+ }
else
{
// Log a warning about this but otherwise ignore it.
- log.warn("Got an unknown control message: " + message);
+ log.warn("Got an unknown control message, controlType = " + controlType + ", message = " + message);
}
}
catch (JMSException e)
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase1DummyRun.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase1DummyRun.java
index aa31a59b06..874f86daa9 100644
--- a/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase1DummyRun.java
+++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase1DummyRun.java
@@ -84,6 +84,8 @@ public class TestCase1DummyRun implements InteropClientTestCase
public void onMessage(Message message)
{
+ log.debug("public void onMessage(Message message = " + message + "): called");
+
// Ignore any messages.
}
}
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase2BasicP2P.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase2BasicP2P.java
index c3c05d8fd9..ea62b46451 100644
--- a/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase2BasicP2P.java
+++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase2BasicP2P.java
@@ -20,11 +20,12 @@
*/
package org.apache.qpid.interop.testclient.testcases;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.Session;
+import javax.jms.*;
+
+import org.apache.log4j.Logger;
import org.apache.qpid.interop.testclient.InteropClientTestCase;
+import org.apache.qpid.interop.testclient.TestClient;
/**
* Implements test case 2, basic P2P. Sends/received a specified number of messages to a specified route on the
@@ -41,6 +42,30 @@ import org.apache.qpid.interop.testclient.InteropClientTestCase;
*/
public class TestCase2BasicP2P implements InteropClientTestCase
{
+ /** Used for debugging. */
+ private static final Logger log = Logger.getLogger(TestCase2BasicP2P.class);
+
+ /** Holds the count of test messages received. */
+ private int messageCount;
+
+ /** The role to be played by the test. */
+ private Roles role;
+
+ /** The number of test messages to send. */
+ private int numMessages;
+
+ /** The routing key to send them to on the default direct exchange. */
+ private Destination sendDestination;
+
+ /** The connection to send the test messages on. */
+ private Connection connection;
+
+ /** The session to send the test messages on. */
+ private Session session;
+
+ /** The producer to send the test messages with. */
+ MessageProducer producer;
+
/**
* Should provide the name of the test case that this class implements. The exact names are defined in the
* interop testing spec.
@@ -49,6 +74,8 @@ public class TestCase2BasicP2P implements InteropClientTestCase
*/
public String getName()
{
+ log.debug("public String getName(): called");
+
return "TC2_BasicP2P";
}
@@ -63,6 +90,8 @@ public class TestCase2BasicP2P implements InteropClientTestCase
*/
public boolean acceptInvite(Message inviteMessage) throws JMSException
{
+ log.debug("public boolean acceptInvite(Message inviteMessage = " + inviteMessage + "): called");
+
// All invites are acceptable.
return true;
}
@@ -79,27 +108,65 @@ public class TestCase2BasicP2P implements InteropClientTestCase
*/
public void assignRole(Roles role, Message assignRoleMessage) throws JMSException
{
+ log.debug("public void assignRole(Roles role = " + role + ", Message assignRoleMessage = " + assignRoleMessage
+ + "): called");
+
+ // Reset the message count for a new test.
+ messageCount = 0;
+
// Take note of the role to be played.
+ this.role = role;
+
+ // Create a new connection to pass the test messages on.
+ connection =
+ TestClient.createConnection(TestClient.DEFAULT_CONNECTION_PROPS_RESOURCE, TestClient.brokerUrl,
+ TestClient.virtualHost);
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Extract and retain the test parameters.
+ numMessages = assignRoleMessage.getIntProperty("P2P_NUM_MESSAGES");
+ sendDestination = session.createQueue(assignRoleMessage.getStringProperty("P2P_QUEUE_AND_KEY_NAME"));
- // Create a new connection to pass the test messages on.
+ log.debug("numMessages = " + numMessages);
+ log.debug("sendDestination = " + sendDestination);
+ log.debug("role = " + role);
- // Check if the sender role is being assigned, and set up a message producer if so.
+ switch (role)
{
- }
+ // Check if the sender role is being assigned, and set up a message producer if so.
+ case SENDER:
+ producer = session.createProducer(sendDestination);
+ break;
+
// Otherwise the receiver role is being assigned, so set this up to listen for messages.
- {
+ case RECEIVER:
+ MessageConsumer consumer = session.createConsumer(sendDestination);
+ consumer.setMessageListener(this);
+ break;
}
+
+ connection.start();
}
/**
* Performs the test case actions.
*/
- public void start()
+ public void start() throws JMSException
{
+ log.debug("public void start(): called");
+
// Check that the sender role is being performed.
+ if (role.equals(Roles.SENDER))
{
+ Message testMessage = session.createTextMessage("test");
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ producer.send(testMessage);
+
+ // Increment the message count.
+ messageCount++;
+ }
}
}
@@ -114,11 +181,17 @@ public class TestCase2BasicP2P implements InteropClientTestCase
*/
public Message getReport(Session session) throws JMSException
{
+ log.debug("public Message getReport(Session session): called");
+
// Close the test connection.
+ connection.close();
// Generate a report message containing the count of the number of messages passed.
+ Message report = session.createMessage();
+ report.setStringProperty("CONTROL_TYPE", "REPORT");
+ report.setIntProperty("MESSAGE_COUNT", messageCount);
- return null;
+ return report;
}
/**
@@ -128,6 +201,9 @@ public class TestCase2BasicP2P implements InteropClientTestCase
*/
public void onMessage(Message message)
{
+ log.debug("public void onMessage(Message message = " + message + "): called");
+
// Increment the message count.
+ messageCount++;
}
}
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase3BasicPubSub.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase3BasicPubSub.java
new file mode 100644
index 0000000000..2773cad3f3
--- /dev/null
+++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase3BasicPubSub.java
@@ -0,0 +1,224 @@
+package org.apache.qpid.interop.testclient.testcases;
+
+import javax.jms.*;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.interop.testclient.InteropClientTestCase;
+import org.apache.qpid.interop.testclient.TestClient;
+
+/**
+ * Implements test case 3, basic pub/sub. Sends/received a specified number of messages to a specified route on the
+ * default topic exchange, using the specified number of receiver connections. Produces reports on the actual number of
+ * messages sent/received.
+ *
+ * <p><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Supply the name of the test case that this implements.
+ * <tr><td> Accept/Reject invites based on test parameters.
+ * <tr><td> Adapt to assigned roles.
+ * <tr><td> Send required number of test messages using pub/sub.
+ * <tr><td> Generate test reports.
+ * </table>
+ */
+public class TestCase3BasicPubSub implements InteropClientTestCase
+{
+ /** Used for debugging. */
+ private static final Logger log = Logger.getLogger(TestCase3BasicPubSub.class);
+
+ /** Holds the count of test messages received. */
+ private int messageCount;
+
+ /** The role to be played by the test. */
+ private Roles role;
+
+ /** The number of test messages to send. */
+ private int numMessages;
+
+ /** The number of receiver connection to use. */
+ private int numReceivers;
+
+ /** The routing key to send them to on the default direct exchange. */
+ private Destination sendDestination;
+
+ /** The connections to send/receive the test messages on. */
+ private Connection[] connection;
+
+ /** The sessions to send/receive the test messages on. */
+ private Session[] session;
+
+ /** The producer to send the test messages with. */
+ MessageProducer producer;
+
+ /**
+ * Should provide the name of the test case that this class implements. The exact names are defined in the
+ * interop testing spec.
+ *
+ * @return The name of the test case that this implements.
+ */
+ public String getName()
+ {
+ log.debug("public String getName(): called");
+
+ return "TC3_BasicPubSub";
+ }
+
+ /**
+ * Determines whether the test invite that matched this test case is acceptable.
+ *
+ * @param inviteMessage The invitation to accept or reject.
+ *
+ * @return <tt>true</tt> to accept the invitation, <tt>false</tt> to reject it.
+ *
+ * @throws javax.jms.JMSException Any JMSException resulting from reading the message are allowed to fall through.
+ */
+ public boolean acceptInvite(Message inviteMessage) throws JMSException
+ {
+ log.debug("public boolean acceptInvite(Message inviteMessage = " + inviteMessage + "): called");
+
+ // All invites are acceptable.
+ return true;
+ }
+
+ /**
+ * Assigns the role to be played by this test case. The test parameters are fully specified in the
+ * assignment message. When this method return the test case will be ready to execute.
+ *
+ * @param role The role to be played; sender or receiver.
+ *
+ * @param assignRoleMessage The role assingment message, contains the full test parameters.
+ *
+ * @throws JMSException Any JMSException resulting from reading the message are allowed to fall through.
+ */
+ public void assignRole(Roles role, Message assignRoleMessage) throws JMSException
+ {
+ log.debug("public void assignRole(Roles role = " + role + ", Message assignRoleMessage = " + assignRoleMessage
+ + "): called");
+
+ // Reset the message count for a new test.
+ messageCount = 0;
+
+ // Take note of the role to be played.
+ this.role = role;
+
+ // Extract and retain the test parameters.
+ numMessages = assignRoleMessage.getIntProperty("PUBSUB_NUM_MESSAGES");
+ numReceivers = assignRoleMessage.getIntProperty("PUBSUB_NUM_RECEIVERS");
+ String sendKey = assignRoleMessage.getStringProperty("PUBSUB_KEY");
+
+ log.debug("numMessages = " + numMessages);
+ log.debug("numReceivers = " + numReceivers);
+ log.debug("sendKey = " + sendKey);
+ log.debug("role = " + role);
+
+ switch (role)
+ {
+ // Check if the sender role is being assigned, and set up a single message producer if so.
+ case SENDER:
+ // Create a new connection to pass the test messages on.
+ connection = new Connection[1];
+ session = new Session[1];
+
+ connection[0] =
+ TestClient.createConnection(TestClient.DEFAULT_CONNECTION_PROPS_RESOURCE, TestClient.brokerUrl,
+ TestClient.virtualHost);
+ session[0] = connection[0].createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ // Extract and retain the test parameters.
+ sendDestination = session[0].createTopic(sendKey);
+
+ producer = session[0].createProducer(sendDestination);
+ break;
+
+ // Otherwise the receiver role is being assigned, so set this up to listen for messages on the required number
+ // of receiver connections.
+ case RECEIVER:
+ // Create the required number of receiver connections.
+ connection = new Connection[numReceivers];
+ session = new Session[numReceivers];
+
+ for (int i = 0; i < numReceivers; i++)
+ {
+ connection[i] =
+ TestClient.createConnection(TestClient.DEFAULT_CONNECTION_PROPS_RESOURCE, TestClient.brokerUrl,
+ TestClient.virtualHost);
+ session[i] = connection[i].createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ sendDestination = session[i].createTopic(sendKey);
+
+ MessageConsumer consumer = session[i].createConsumer(sendDestination);
+ consumer.setMessageListener(this);
+ }
+
+ break;
+ }
+
+ // Start all the connection dispatcher threads running.
+ for (int i = 0; i < connection.length; i++)
+ {
+ connection[i].start();
+ }
+ }
+
+ /**
+ * Performs the test case actions.
+ */
+ public void start() throws JMSException
+ {
+ log.debug("public void start(): called");
+
+ // Check that the sender role is being performed.
+ if (role.equals(Roles.SENDER))
+ {
+ Message testMessage = session[0].createTextMessage("test");
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ producer.send(testMessage);
+
+ // Increment the message count.
+ messageCount++;
+ }
+ }
+ }
+
+ /**
+ * Gets a report on the actions performed by the test case in its assigned role.
+ *
+ * @param session The session to create the report message in.
+ *
+ * @return The report message.
+ *
+ * @throws JMSException Any JMSExceptions resulting from creating the report are allowed to fall through.
+ */
+ public Message getReport(Session session) throws JMSException
+ {
+ log.debug("public Message getReport(Session session): called");
+
+ // Close the test connections.
+ for (int i = 0; i < connection.length; i++)
+ {
+ connection[i].close();
+ }
+
+ // Generate a report message containing the count of the number of messages passed.
+ Message report = session.createMessage();
+ report.setStringProperty("CONTROL_TYPE", "REPORT");
+ report.setIntProperty("MESSAGE_COUNT", messageCount);
+
+ return report;
+ }
+
+ /**
+ * Counts incoming test messages.
+ *
+ * @param message The incoming test message.
+ */
+ public void onMessage(Message message)
+ {
+ log.debug("public void onMessage(Message message = " + message + "): called");
+
+ // Increment the message count.
+ messageCount++;
+ }
+}
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/util/ConversationFactory.java b/java/integrationtests/src/main/java/org/apache/qpid/util/ConversationFactory.java
new file mode 100644
index 0000000000..4ca2fe8ff5
--- /dev/null
+++ b/java/integrationtests/src/main/java/org/apache/qpid/util/ConversationFactory.java
@@ -0,0 +1,479 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.util;
+
+import java.util.*;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.jms.*;
+
+import org.apache.log4j.Logger;
+
+/**
+ * A conversation helper, uses a message correlation id pattern to match up sent and received messages as a conversation
+ * over JMS messaging. Incoming message traffic is divided up by correlation id. Each id has a queue (behaviour dependant
+ * on the queue implementation). Clients of this de-multiplexer can wait on messages, defined by message correlation ids.
+ *
+ * <p/>One use of this is as a conversation synchronizer where multiple threads are carrying out conversations over a
+ * multiplexed messaging route. This can be usefull, as JMS sessions are not multi-threaded. Setting up the conversation
+ * with synchronous queues will allow these threads to be written in a synchronous style, but with their execution order
+ * governed by the asynchronous message flow. For example, something like the following code could run a multi-threaded
+ * conversation (the conversation methods can be called many times in parallel):
+ *
+ * <p/><pre>
+ * class Initiator
+ * {
+ * ConversationHelper conversation = new ConversationHelper(connection, null,
+ * java.util.concurrent.LinkedBlockingQueue.class);
+ *
+ * initiateConversation()
+ * {
+ * try {
+ * // Exchange greetings.
+ * conversation.send(sendDestination, conversation.getSession().createTextMessage("Hello."));
+ * Message greeting = conversation.receive();
+ *
+ * // Exchange goodbyes.
+ * conversation.send(conversation.getSession().createTextMessage("Goodbye."));
+ * Message goodbye = conversation.receive();
+ * } finally {
+ * conversation.end();
+ * }
+ * }
+ * }
+ *
+ * class Responder
+ * {
+ * ConversationHelper conversation = new ConversationHelper(connection, receiveDestination,
+ * java.util.concurrent.LinkedBlockingQueue.class);
+ *
+ * respondToConversation()
+ * {
+ * try {
+ * // Exchange greetings.
+ * Message greeting = conversation.receive();
+ * conversation.send(conversation.getSession().createTextMessage("Hello."));
+ *
+ * // Exchange goodbyes.
+ * Message goodbye = conversation.receive();
+ * conversation.send(conversation.getSession().createTextMessage("Goodbye."));
+ * } finally {
+ * conversation.end();
+ * }
+ * }
+ * }
+ * </pre>
+ *
+ * <p/>Conversation correlation id's are generated on a per thread basis.
+ *
+ * <p/>The same session is shared amongst all conversations. Calls to send are therefore synchronized because JMS
+ * sessions are not multi-threaded.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><th> Associate messages to an ongoing conversation using correlation ids.
+ * <tr><td> Auto manage sessions for conversations.
+ * <tr><td> Store messages not in a conversation in dead letter box.
+ * </table>
+ */
+public class ConversationFactory
+{
+ /** Used for debugging. */
+ private static final Logger log = Logger.getLogger(ConversationFactory.class);
+
+ /** Holds a map from correlation id's to queues. */
+ private Map<Long, BlockingQueue<Message>> idsToQueues = new HashMap<Long, BlockingQueue<Message>>();
+
+ /** Holds the connection over which the conversation is conducted. */
+ private Connection connection;
+
+ /** Holds the session over which the conversation is conduxted. */
+ private Session session;
+
+ /** The message consumer for incoming messages. */
+ MessageConsumer consumer;
+
+ /** The message producer for outgoing messages. */
+ MessageProducer producer;
+
+ /** The well-known or temporary destination to receive replies on. */
+ Destination receiveDestination;
+
+ /** Holds the queue implementation class for the reply queue. */
+ Class<? extends BlockingQueue> queueClass;
+
+ /** Used to hold any replies that are received outside of the context of a conversation. */
+ BlockingQueue<Message> deadLetterBox = new LinkedBlockingQueue<Message>();
+
+ /* Used to hold conversation state on a per thread basis. */
+ /*
+ ThreadLocal<Conversation> threadLocals =
+ new ThreadLocal<Conversation>()
+ {
+ protected Conversation initialValue()
+ {
+ Conversation settings = new Conversation();
+ settings.conversationId = conversationIdGenerator.getAndIncrement();
+
+ return settings;
+ }
+ };
+ */
+
+ /** Generates new coversation id's as needed. */
+ AtomicLong conversationIdGenerator = new AtomicLong();
+
+ /**
+ * Creates a conversation helper on the specified connection with the default sending destination, and listening
+ * to the specified receiving destination.
+ *
+ * @param connection The connection to build the conversation helper on.
+ * @param receiveDestination The destination to listen to for incoming messages. This may be null to use a temporary
+ * queue.
+ * @param queueClass The queue implementation class.
+ *
+ * @throws JMSException All undelying JMSExceptions are allowed to fall through.
+ */
+ public ConversationFactory(Connection connection, Destination receiveDestination,
+ Class<? extends BlockingQueue> queueClass) throws JMSException
+ {
+ log.debug("public ConversationFactory(Connection connection, Destination receiveDestination = " + receiveDestination
+ + ", Class<? extends BlockingQueue> queueClass = " + queueClass + "): called");
+
+ this.connection = connection;
+ this.queueClass = queueClass;
+
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ // Check if a well-known receive destination has been provided, or use a temporary queue if not.
+ this.receiveDestination = (receiveDestination != null) ? receiveDestination : session.createTemporaryQueue();
+
+ consumer = session.createConsumer(receiveDestination);
+ producer = session.createProducer(null);
+
+ consumer.setMessageListener(new Receiver());
+ }
+
+ /**
+ * Creates a new conversation context.
+ *
+ * @return A new conversation context.
+ */
+ public Conversation startConversation()
+ {
+ log.debug("public Conversation startConversation(): called");
+
+ Conversation conversation = new Conversation();
+ conversation.conversationId = conversationIdGenerator.getAndIncrement();
+
+ return conversation;
+ }
+
+ /**
+ * Ensures that the reply queue for a conversation exists.
+ *
+ * @param conversationId The conversation correlation id.
+ */
+ private void initQueueForId(long conversationId)
+ {
+ if (!idsToQueues.containsKey(conversationId))
+ {
+ idsToQueues.put(conversationId, ReflectionUtils.<BlockingQueue>newInstance(queueClass));
+ }
+ }
+
+ /**
+ * Clears the dead letter box, returning all messages that were in it.
+ *
+ * @return All messages in the dead letter box.
+ */
+ public Collection<Message> emptyDeadLetterBox()
+ {
+ log.debug("public Collection<Message> emptyDeadLetterBox(): called");
+
+ Collection<Message> result = new ArrayList<Message>();
+ deadLetterBox.drainTo(result);
+
+ return result;
+ }
+
+ /**
+ * Gets the session over which the conversation is conducted.
+ *
+ * @return The session over which the conversation is conducted.
+ */
+ public Session getSession()
+ {
+ // Conversation settings = threadLocals.get();
+
+ return session;
+ }
+
+ /**
+ * Used to hold a conversation context. This consists of a correlating id for the conversation, and a reply
+ * destination automatically updated to the last received reply-to destination.
+ */
+ public class Conversation
+ {
+ /** Holds the correlation id for the context. */
+ long conversationId;
+
+ /**
+ * Holds the send destination for the context. This will automatically be updated to the most recently received
+ * reply-to destination.
+ */
+ Destination sendDestination;
+
+ /**
+ * Sends a message to the default sending location. The correlation id of the message will be assigned by this
+ * method, overriding any previously set value.
+ *
+ * @param sendDestination The destination to send to. This may be null to use the last received reply-to
+ * destination.
+ * @param message The message to send.
+ *
+ * @throws JMSException All undelying JMSExceptions are allowed to fall through. This will also be thrown if no
+ * send destination is specified and there is no most recent reply-to destination available
+ * to use.
+ */
+ public void send(Destination sendDestination, Message message) throws JMSException
+ {
+ log.debug("public void send(Destination sendDestination = " + sendDestination + ", Message message = " + message
+ + "): called");
+
+ // Conversation settings = threadLocals.get();
+ // long conversationId = conversationId;
+ message.setJMSCorrelationID(Long.toString(conversationId));
+ message.setJMSReplyTo(receiveDestination);
+
+ // Ensure that the reply queue for this conversation exists.
+ initQueueForId(conversationId);
+
+ // Check if an overriding send to destination has been set or use the last reply-to if not.
+ Destination sendTo = null;
+
+ if (sendDestination != null)
+ {
+ sendTo = sendDestination;
+ }
+ else if (sendDestination != null)
+ {
+ sendTo = sendDestination;
+ }
+ else
+ {
+ throw new JMSException("The send destination was specified, and no most recent reply-to available to use.");
+ }
+
+ // Send the message.
+ synchronized (this)
+ {
+ producer.send(sendTo, message);
+ }
+ }
+
+ /**
+ * Gets the next message in an ongoing conversation. This method may block until such a message is received.
+ *
+ * @return The next incoming message in the conversation.
+ *
+ * @throws JMSException All undelying JMSExceptions are allowed to fall through. Thrown if the received message
+ * did not have its reply-to destination set up.
+ */
+ public Message receive() throws JMSException
+ {
+ log.debug("public Message receive(): called");
+
+ // Conversation settings = threadLocals.get();
+ // long conversationId = settings.conversationId;
+
+ // Ensure that the reply queue for this conversation exists.
+ initQueueForId(conversationId);
+
+ BlockingQueue<Message> queue = idsToQueues.get(conversationId);
+
+ try
+ {
+ Message result = queue.take();
+
+ // Keep the reply-to destination to send replies to.
+ sendDestination = result.getJMSReplyTo();
+
+ return result;
+ }
+ catch (InterruptedException e)
+ {
+ return null;
+ }
+ }
+
+ /**
+ * Gets many messages in an ongoing conversation. If a limit is specified, then once that many messages are
+ * received they will be returned. If a timeout is specified, then all messages up to the limit, received within
+ * that timespan will be returned. At least one of the message count or timeout should be set to a value of
+ * 1 or greater.
+ *
+ * @param num The number of messages to receive, or all if this is less than 1.
+ * @param timeout The timeout in milliseconds to receive the messages in, or forever if this is less than 1.
+ *
+ * @return All messages received within the count limit and the timeout.
+ *
+ * @throws JMSException All undelying JMSExceptions are allowed to fall through.
+ */
+ public Collection<Message> receiveAll(int num, long timeout) throws JMSException
+ {
+ log.debug("public Collection<Message> receiveAll(int num = " + num + ", long timeout = " + timeout
+ + "): called");
+
+ // Check that a timeout or message count was set.
+ if ((num < 1) && (timeout < 1))
+ {
+ throw new IllegalArgumentException("At least one of message count (num) or timeout must be set.");
+ }
+
+ // Ensure that the reply queue for this conversation exists.
+ initQueueForId(conversationId);
+ BlockingQueue<Message> queue = idsToQueues.get(conversationId);
+
+ // Used to collect the received messages in.
+ Collection<Message> result = new ArrayList<Message>();
+
+ // Used to indicate when the timeout or message count has expired.
+ boolean receiveMore = true;
+
+ int messageCount = 0;
+
+ // Receive messages until the timeout or message count expires.
+ do
+ {
+ try
+ {
+ Message next = null;
+
+ // Try to receive the message with a timeout if one has been set.
+ if (timeout > 0)
+ {
+ next = queue.poll(timeout, TimeUnit.MILLISECONDS);
+
+ // Check if the timeout expired, and stop receiving if so.
+ if (next == null)
+ {
+ receiveMore = false;
+ }
+ }
+ // Receive the message without a timeout.
+ else
+ {
+ next = queue.take();
+ }
+
+ // Increment the message count if a message was received.
+ messageCount += (next != null) ? 1 : 0;
+
+ // Check if all the requested messages were received, and stop receiving if so.
+ if ((num > 0) && (messageCount >= num))
+ {
+ receiveMore = false;
+ }
+
+ // Keep the reply-to destination to send replies to.
+ sendDestination = (next != null) ? next.getJMSReplyTo() : sendDestination;
+
+ if (next != null)
+ {
+ result.add(next);
+ }
+ }
+ catch (InterruptedException e)
+ {
+ // Restore the threads interrupted status.
+ Thread.currentThread().interrupt();
+
+ // Stop receiving but return the messages received so far.
+ receiveMore = false;
+ }
+ }
+ while (receiveMore);
+
+ return result;
+ }
+
+ /**
+ * Completes the conversation. Any correlation id's pertaining to the conversation are no longer valid, and any
+ * incoming messages using them will go to the dead letter box.
+ */
+ public void end()
+ {
+ log.debug("public void end(): called");
+
+ // Ensure that the thread local for the current thread is cleaned up.
+ // Conversation settings = threadLocals.get();
+ // long conversationId = settings.conversationId;
+ // threadLocals.remove();
+
+ // Ensure that its queue is removed from the queue map.
+ BlockingQueue<Message> queue = idsToQueues.remove(conversationId);
+
+ // Move any outstanding messages on the threads conversation id into the dead letter box.
+ queue.drainTo(deadLetterBox);
+ }
+ }
+
+ /**
+ * Implements the message listener for this conversation handler.
+ */
+ protected class Receiver implements MessageListener
+ {
+ /**
+ * Handles all incoming messages in the ongoing conversations. These messages are split up by correaltion id
+ * and placed into queues.
+ *
+ * @param message The incoming message.
+ */
+ public void onMessage(Message message)
+ {
+ log.debug("public void onMessage(Message message = " + message + "): called");
+
+ try
+ {
+ Long conversationId = Long.parseLong(message.getJMSCorrelationID());
+
+ // Find the converstaion queue to place the message on. If there is no conversation for the message id,
+ // the the dead letter box queue is used.
+ BlockingQueue<Message> queue = idsToQueues.get(conversationId);
+ queue = (queue == null) ? deadLetterBox : queue;
+
+ queue.put(message);
+ }
+ catch (JMSException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+}
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/util/ConversationHelper.java b/java/integrationtests/src/main/java/org/apache/qpid/util/ConversationHelper.java
deleted file mode 100644
index 1fd1fee377..0000000000
--- a/java/integrationtests/src/main/java/org/apache/qpid/util/ConversationHelper.java
+++ /dev/null
@@ -1,306 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.util;
-
-import java.util.*;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.jms.*;
-
-/**
- * A conversation helper, uses a message correlation id pattern to match up sent and received messages as a conversation
- * over JMS messaging. Incoming message traffic is divided up by correlation id. Each id has a queue (behaviour dependant
- * on the queue implementation). Clients of this de-multiplexer can wait on messages, defined by message correlation ids.
- *
- * <p/>One use of this is as a conversation synchronizer where multiple threads are carrying out conversations over a
- * multiplexed messaging route. This can be usefull, as JMS sessions are not multi-threaded. Setting up the conversation
- * with synchronous queues will allow these threads to be written in a synchronous style, but with their execution order
- * governed by the asynchronous message flow. For example, something like the following code could run a multi-threaded
- * conversation (the conversation methods can be called many times in parallel):
- *
- * <p/><pre>
- * ConversationHelper conversation = new ConversationHelper(connection, sendDesitination, replyDestination,
- * java.util.concurrent.LinkedBlockingQueue.class);
- *
- * initiateConversation()
- * {
- * try {
- * // Exchange greetings.
- * conversation.send(conversation.getSession().createTextMessage("Hello."));
- * Message greeting = conversation.receive();
- *
- * // Exchange goodbyes.
- * conversation.send(conversation.getSession().createTextMessage("Goodbye."));
- * Message goodbye = conversation.receive();
- * } finally {
- * conversation.end();
- * }
- * }
- *
- * respondToConversation()
- * {
- * try {
- * // Exchange greetings.
- * Message greeting = conversation.receive();
- * conversation.send(conversation.getSession().createTextMessage("Hello."));
- *
- * // Exchange goodbyes.
- * Message goodbye = conversation.receive();
- * conversation.send(conversation.getSession().createTextMessage("Goodbye."));
- * } finally {
- * conversation.end();
- * }
- * }
- *
- * </pre>
- *
- * <p/><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * <tr><th> Associate messages to an ongoing conversation using correlation ids.
- * <tr><td> Auto manage sessions for conversations.
- * <tr><td> Store messages not in a conversation in dead letter box.
- * </table>
- *
- * @todo Non-transactional, can use shared session. Transactional, must have session per-thread. Session pool? In
- * transactional mode, commits must happen before receiving, or no replies will come in. (unless there were some
- * pending on the queue?). Also, having received on a particular session, must ensure that session is used for all
- * subsequent sends and receive at least until the transaction is committed. So a message selector must be used
- * to restrict receives on that session to prevent it picking up messages bound for other conversations. Or use
- * a temporary response queue, with only that session listening to it.
- *
- * @todo Want something convenient that hides details. Write out some example use cases to get the best feel for
- * it. Pass in connection, send destination, receive destination. Provide endConvo, send, receive
- * methods. Bind corrId, session etc. on thread locals? Clean on endConvo. Provide deadLetter box, that
- * uncorrelated or late messages go in. Provide time-out on wait methods, and global time-out.
- * PingPongProducer provides a good use-case example (sends messages, waits for replies).
- *
- * @todo New correlationId on every send? or correlation id per conversation? or callers choice.
- */
-public class ConversationHelper
-{
- /** Holds a map from correlation id's to queues. */
- private Map<Long, BlockingQueue<Message>> idsToQueues = new HashMap<Long, BlockingQueue<Message>>();
-
- private Session session;
- private MessageProducer producer;
- private MessageConsumer consumer;
-
- Class<? extends BlockingQueue> queueClass;
-
- BlockingQueue<Message> deadLetterBox = new LinkedBlockingQueue<Message>();
-
- ThreadLocal<PerThreadSettings> threadLocals =
- new ThreadLocal<PerThreadSettings>()
- {
- protected PerThreadSettings initialValue()
- {
- PerThreadSettings settings = new PerThreadSettings();
- settings.conversationId = conversationIdGenerator.getAndIncrement();
-
- return settings;
- }
- };
-
- /** Generates new coversation id's as needed. */
- AtomicLong conversationIdGenerator = new AtomicLong();
-
- /**
- * Creates a conversation helper on the specified connection with the default sending destination, and listening
- * to the specified receiving destination.
- *
- * @param connection The connection to build the conversation helper on.
- * @param sendDestination The default sending destiation for all messages.
- * @param receiveDestination The destination to listen to for incoming messages.
- * @param queueClass The queue implementation class.
- *
- * @throws JMSException All undelying JMSExceptions are allowed to fall through.
- */
- public ConversationHelper(Connection connection, Destination sendDestination, Destination receiveDestination,
- Class<? extends BlockingQueue> queueClass) throws JMSException
- {
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- producer = session.createProducer(sendDestination);
- consumer = session.createConsumer(receiveDestination);
-
- consumer.setMessageListener(new Receiver());
-
- this.queueClass = queueClass;
- }
-
- /**
- * Sends a message to the default sending location. The correlation id of the message will be assigned by this
- * method, overriding any previously set value.
- *
- * @param message The message to send.
- *
- * @throws JMSException All undelying JMSExceptions are allowed to fall through.
- */
- public void send(Message message) throws JMSException
- {
- PerThreadSettings settings = threadLocals.get();
- long conversationId = settings.conversationId;
- message.setJMSCorrelationID(Long.toString(conversationId));
-
- // Ensure that the reply queue for this conversation exists.
- initQueueForId(conversationId);
-
- producer.send(message);
- }
-
- /**
- * Ensures that the reply queue for a conversation exists.
- *
- * @param conversationId The conversation correlation id.
- */
- private void initQueueForId(long conversationId)
- {
- if (!idsToQueues.containsKey(conversationId))
- {
- idsToQueues.put(conversationId, ReflectionUtils.<BlockingQueue>newInstance(queueClass));
- }
- }
-
- /**
- * Gets the next message in an ongoing conversation. This method may block until such a message is received.
- *
- * @return The next incoming message in the conversation.
- */
- public Message receive()
- {
- PerThreadSettings settings = threadLocals.get();
- long conversationId = settings.conversationId;
-
- // Ensure that the reply queue for this conversation exists.
- initQueueForId(conversationId);
-
- BlockingQueue<Message> queue = idsToQueues.get(conversationId);
-
- try
- {
- return queue.take();
- }
- catch (InterruptedException e)
- {
- return null;
- }
- }
-
- /**
- * Gets many messages in an ongoing conversation. If a limit is specified, then once that many messages are
- * received they will be returned. If a timeout is specified, then all messages up to the limit, received within
- * that timespan will be returned.
- *
- * @param num The number of messages to receive, or all if this is less than 1.
- * @param timeout The timeout in milliseconds to receive the messages in, or forever if this is less than 1.
- *
- * @return All messages received within the count limit and the timeout.
- */
- public Collection<Message> receiveAll(int num, long timeout)
- {
- Collection<Message> result = new ArrayList<Message>();
-
- for (int i = 0; i < num; i++)
- {
- result.add(receive());
- }
-
- return result;
- }
-
- /**
- * Completes the conversation. Any open transactions are committed. Any correlation id's pertaining to the
- * conversation are no longer valid, and any incoming messages using them will go to the dead letter box.
- */
- public void end()
- {
- // Ensure that the thread local for the current thread is cleaned up.
- PerThreadSettings settings = threadLocals.get();
- long conversationId = settings.conversationId;
- threadLocals.remove();
-
- // Ensure that its queue is removed from the queue map.
- BlockingQueue<Message> queue = idsToQueues.remove(conversationId);
-
- // Move any outstanding messages on the threads conversation id into the dead letter box.
- queue.drainTo(deadLetterBox);
- }
-
- /**
- * Clears the dead letter box, returning all messages that were in it.
- *
- * @return All messages in the dead letter box.
- */
- public Collection<Message> emptyDeadLetterBox()
- {
- Collection<Message> result = new ArrayList<Message>();
- deadLetterBox.drainTo(result);
-
- return result;
- }
-
- /**
- * Implements the message listener for this conversation handler.
- */
- protected class Receiver implements MessageListener
- {
- /**
- * Handles all incoming messages in the ongoing conversations. These messages are split up by correaltion id
- * and placed into queues.
- *
- * @param message The incoming message.
- */
- public void onMessage(Message message)
- {
- try
- {
- Long conversationId = Long.parseLong(message.getJMSCorrelationID());
-
- // Find the converstaion queue to place the message on. If there is no conversation for the message id,
- // the the dead letter box queue is used.
- BlockingQueue<Message> queue = idsToQueues.get(conversationId);
- queue = (queue == null) ? deadLetterBox : queue;
-
- queue.put(message);
- }
- catch (JMSException e)
- {
- throw new RuntimeException(e);
- }
- catch (InterruptedException e)
- {
- throw new RuntimeException(e);
- }
- }
- }
-
- protected class PerThreadSettings
- {
- /** Holds the correlation id for the current threads conversation. */
- long conversationId;
- }
-
- public Session getSession()
- {
- return session;
- }
-}