summaryrefslogtreecommitdiff
path: root/java/integrationtests
diff options
context:
space:
mode:
authorRobert Greig <rgreig@apache.org>2007-03-15 10:12:57 +0000
committerRobert Greig <rgreig@apache.org>2007-03-15 10:12:57 +0000
commita249dbcb7d2d3211ef3f5a32b49a867ab5e9407c (patch)
treeabee9edbd57fde28a1d2afa2a64984eca3287711 /java/integrationtests
parent261ddfa996d1a94a2663bdad81efc51de101db3c (diff)
downloadqpid-python-a249dbcb7d2d3211ef3f5a32b49a867ab5e9407c.tar.gz
Commit of interop test stuff prior to M2 branch.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@518559 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/integrationtests')
-rw-r--r--java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/CoordinatingTestCase.java7
-rw-r--r--java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/TestClientDetails.java2
-rw-r--r--java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java269
-rw-r--r--java/integrationtests/src/main/java/org/apache/qpid/util/ClasspathScanner.java121
-rw-r--r--java/integrationtests/src/main/java/org/apache/qpid/util/ConversationHelper.java144
5 files changed, 427 insertions, 116 deletions
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/CoordinatingTestCase.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/CoordinatingTestCase.java
index 12faa64528..efeda78abf 100644
--- a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/CoordinatingTestCase.java
+++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/CoordinatingTestCase.java
@@ -67,8 +67,8 @@ public abstract class CoordinatingTestCase extends TestCase
* @param allClients The list of all possible test clients that may accept the invitation.
* @param testProperties The test case definition.
*/
- public CoordinatingTestCase(TestClientDetails sender, TestClientDetails receiver,
- Collection<TestClientDetails> allClients, Properties testProperties)
+ public void TestCase(TestClientDetails sender, TestClientDetails receiver, Collection<TestClientDetails> allClients,
+ Properties testProperties)
{ }
/**
@@ -83,8 +83,7 @@ public abstract class CoordinatingTestCase extends TestCase
*
* @return The test results from the senders and receivers.
*/
- protected Object[] sequenceTest(TestClientDetails sender, TestClientDetails receiver,
- Collection<TestClientDetails> allParticipatingClients, Properties testProperties)
+ protected Object[] sequenceTest(TestClientDetails sender, TestClientDetails receiver, Properties testProperties)
{
// Check if the sender and recevier did not accept the invite to this test.
{
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/TestClientDetails.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/TestClientDetails.java
index 3a201b6899..fcfb5a08fd 100644
--- a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/TestClientDetails.java
+++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/TestClientDetails.java
@@ -28,8 +28,10 @@ package org.apache.qpid.interop.coordinator;
public class TestClientDetails
{
/** The test clients name. */
+ public String clientName;
/* The test clients unqiue sequence number. Not currently used. */
/** The routing key of the test clients control topic. */
+ public String privateControlKey;
}
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java
index 2c04a8e52b..a623687a0f 100644
--- a/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java
+++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java
@@ -20,12 +20,22 @@
*/
package org.apache.qpid.interop.testclient;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Properties;
-import javax.jms.Message;
-import javax.jms.MessageListener;
+import javax.jms.*;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.util.ClasspathScanner;
import org.apache.qpid.util.CommandLineParser;
+import org.apache.qpid.util.PropertiesUtils;
/**
* Implements a test client as described in the interop testing spec
@@ -49,76 +59,26 @@ import org.apache.qpid.util.CommandLineParser;
*/
public class TestClient implements MessageListener
{
+ private static Logger log = Logger.getLogger(TestClient.class);
+
/** Holds the URL of the broker to run the tests on. */
String brokerUrl;
/** Holds the virtual host to run the tests on. If <tt>null</tt>, then the default virtual host is used. */
String virtualHost;
- /** Defines an enumeration of the control message types and handling behaviour for each. */
- protected enum ControlMessages implements MessageListener
- {
- INVITE_COMPULSORY
- {
- public void onMessage(Message message)
- {
- // Reply with the client name in an Enlist message.
- }
- },
- INVITE
- {
- public void onMessage(Message message)
- {
- // Extract the test properties.
-
- // Check if the requested test case is available.
- {
- // Make the requested test case the current test case.
+ /** Holds all the test cases loaded from the classpath. */
+ Map<String, InteropClientTestCase> testCases = new HashMap<String, InteropClientTestCase>();
- // Reply by accepting the invite in an Enlist message.
- }
- }
- },
- ASSIGN_ROLE
- {
- public void onMessage(Message message)
- {
- // Extract the test properties.
+ InteropClientTestCase currentTestCase;
- // Reply by accepting the role in an Accept Role message.
- }
- },
- START
- {
- public void onMessage(Message message)
- {
- // Start the current test case.
+ public static final String CONNECTION_PROPERTY = "connectionfactory.broker";
+ public static final String CONNECTION_NAME = "broker";
+ public static final String CLIENT_NAME = "java";
+ public static final String DEFAULT_CONNECTION_PROPS_RESOURCE = "org/apache/qpid/interop/client/connection.properties";
- // Generate the report from the test case and reply with it as a Report message.
- }
- },
- STATUS_REQUEST
- {
- public void onMessage(Message message)
- {
- // Generate the report from the test case and reply with it as a Report message.
- }
- },
- UNKNOWN
- {
- public void onMessage(Message message)
- {
- // Log a warning about this but otherwise ignore it.
- }
- };
-
- /**
- * Handles control messages appropriately depending on the message type.
- *
- * @param message The incoming message to handle.
- */
- public abstract void onMessage(Message message);
- }
+ private MessageProducer producer;
+ private Session session;
public TestClient(String brokerUrl, String virtualHost)
{
@@ -172,42 +132,199 @@ public class TestClient implements MessageListener
// Create a test client and start it running.
TestClient client = new TestClient(brokerUrl, virtualHost);
- client.start();
+
+ try
+ {
+ client.start();
+ }
+ catch (Exception e)
+ {
+ log.error("The test client was unable to start.", e);
+ System.exit(1);
+ }
}
- private void start()
+ private void start() throws JMSException
{
// Use a class path scanner to find all the interop test case implementations.
+ Collection<Class<? extends InteropClientTestCase>> testCaseClasses =
+ ClasspathScanner.getMatches(InteropClientTestCase.class, "^TestCase.*", true);
// Create all the test case implementations and index them by the test names.
+ for (Class<? extends InteropClientTestCase> nextClass : testCaseClasses)
+ {
+ try
+ {
+ InteropClientTestCase testCase = nextClass.newInstance();
+ testCases.put(testCase.getName(), testCase);
+ }
+ catch (InstantiationException e)
+ {
+ log.warn("Could not instantiate test case class: " + nextClass.getName(), e);
+ // Ignored.
+ }
+ catch (IllegalAccessException e)
+ {
+ log.warn("Could not instantiate test case class due to illegal access: " + nextClass.getName(), e);
+ // Ignored.
+ }
+ }
// Open a connection to communicate with the coordinator on.
+ Connection connection = createConnection(DEFAULT_CONNECTION_PROPS_RESOURCE, brokerUrl, virtualHost);
+
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Set this up to listen for control messages.
+ MessageConsumer consumer = session.createConsumer(session.createTopic("iop.control." + CLIENT_NAME));
+ consumer.setMessageListener(this);
// Create a producer to send replies with.
+ producer = session.createProducer(null);
+
+ // Start listening for incoming control messages.
+ connection.start();
}
/**
- * Handles all incoming control messages.
+ * Establishes a JMS connection using a properties file and qpids built in JNDI implementation. This is a simple
+ * convenience method for code that does anticipate handling connection failures. All exceptions that indicate
+ * that the connection has failed, are wrapped as rutime exceptions, preumably handled by a top level failure
+ * handler.
*
- * @param message The incoming message.
+ * @todo Make username/password configurable. Allow multiple urls for fail over. Once it feels right, move it
+ * to a Utils library class.
+ *
+ * @param connectionPropsResource The name of the connection properties file.
+ * @param brokerUrl The broker url to connect to, <tt>null</tt> to use the default from the properties.
+ * @param virtualHost The virtual host to connectio to, <tt>null</tt> to use the default.
+ *
+ * @return A JMS conneciton.
*/
- public void onMessage(Message message)
+ private static Connection createConnection(String connectionPropsResource, String brokerUrl, String virtualHost)
{
- // Delegate the message handling to the message type specific handler.
- extractMessageType(message).onMessage(message);
+ try
+ {
+ Properties connectionProps =
+ PropertiesUtils.getProperties(TestClient.class.getClassLoader().getResourceAsStream(
+ connectionPropsResource));
+
+ String connectionString =
+ "amqp://guest:guest/" + ((virtualHost != null) ? virtualHost : "") + "?brokerlist='" + brokerUrl + "'";
+ connectionProps.setProperty(CONNECTION_PROPERTY, connectionString);
+
+ Context ctx = new InitialContext(connectionProps);
+
+ ConnectionFactory cf = (ConnectionFactory) ctx.lookup(CONNECTION_NAME);
+ Connection connection = cf.createConnection();
+
+ return connection;
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (NamingException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (JMSException e)
+ {
+ throw new RuntimeException(e);
+ }
}
/**
- * Determines the control messsage type of incoming messages.
- *
- * @param message The message to determine the type of.
+ * Handles all incoming control messages.
*
- * @return The control message type of the message.
+ * @param message The incoming message.
*/
- protected ControlMessages extractMessageType(Message message)
+ public void onMessage(Message message)
{
- return null;
+ try
+ {
+ String controlType = message.getStringProperty("CONTROL_TYPE");
+ String testName = message.getStringProperty("TEST_NAME");
+
+ // Check if the message is a test invite.
+ if ("INVITE".equals(controlType))
+ {
+ String testCaseName = message.getStringProperty("TEST_NAME");
+
+ // Flag used to indicate that an enlist should be sent. Only enlist to compulsory invites or invites
+ // for which test cases exist.
+ boolean enlist = false;
+
+ if (testCaseName != null)
+ {
+ // Check if the requested test case is available.
+ InteropClientTestCase testCase = testCases.get(testCaseName);
+
+ if (testCase != null)
+ {
+ // Make the requested test case the current test case.
+ currentTestCase = testCase;
+ enlist = true;
+ }
+ }
+ else
+ {
+ enlist = true;
+ }
+
+ if (enlist)
+ {
+ // Reply with the client name in an Enlist message.
+ Message enlistMessage = session.createMessage();
+ enlistMessage.setStringProperty("CONTROL_TYPE", "ENLIST");
+ enlistMessage.setStringProperty("CLIENT_NAME", CLIENT_NAME);
+ enlistMessage.setStringProperty("CLIENT_PRIVATE_CONTROL_KEY", "iop.control." + CLIENT_NAME);
+ enlistMessage.setJMSCorrelationID(message.getJMSCorrelationID());
+
+ producer.send(message.getJMSReplyTo(), enlistMessage);
+ }
+ }
+ else if ("ASSIGN_ROLE".equals(controlType))
+ {
+ // Assign the role to the current test case.
+ String roleName = message.getStringProperty("");
+ InteropClientTestCase.Roles role = Enum.valueOf(InteropClientTestCase.Roles.class, roleName);
+
+ currentTestCase.assignRole(role, message);
+
+ // Reply by accepting the role in an Accept Role message.
+ Message acceptRoleMessage = session.createMessage();
+ acceptRoleMessage.setStringProperty("CONTROL_TYPE", "ACCEPT_ROLE");
+ acceptRoleMessage.setJMSCorrelationID(message.getJMSCorrelationID());
+
+ producer.send(message.getJMSReplyTo(), acceptRoleMessage);
+ }
+ else if ("START".equals(controlType) || "STATUS_REQUEST".equals(controlType))
+ {
+ if ("START".equals(controlType))
+ {
+ // Start the current test case.
+ currentTestCase.start();
+ }
+
+ // Generate the report from the test case and reply with it as a Report message.
+ Message reportMessage = currentTestCase.getReport(session);
+ reportMessage.setStringProperty("CONTROL_TYPE", "REPORT");
+ reportMessage.setJMSCorrelationID(message.getJMSCorrelationID());
+
+ producer.send(message.getJMSReplyTo(), reportMessage);
+ }
+ else
+ {
+ // Log a warning about this but otherwise ignore it.
+ log.warn("Got an unknown control message: " + message);
+ }
+ }
+ catch (JMSException e)
+ {
+ // Log a warning about this, but otherwise ignore it.
+ log.warn("A JMSException occurred whilst handling a message.");
+ log.debug("Got JMSException whilst handling message: " + message, e);
+ }
}
}
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/util/ClasspathScanner.java b/java/integrationtests/src/main/java/org/apache/qpid/util/ClasspathScanner.java
index 1dd00da53b..35946e6c4e 100644
--- a/java/integrationtests/src/main/java/org/apache/qpid/util/ClasspathScanner.java
+++ b/java/integrationtests/src/main/java/org/apache/qpid/util/ClasspathScanner.java
@@ -22,6 +22,10 @@ package org.apache.qpid.util;
import java.io.File;
import java.util.*;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.log4j.Logger;
/**
* An ClasspathScanner scans the classpath for classes that implement an interface or extend a base class and have names
@@ -38,10 +42,12 @@ import java.util.*;
* <tr><th> Responsibilities <th> Collaborations
* <tr><td> Find all classes matching type and name pattern on the classpath.
* </table>
+ *
+ * @todo Add logic to scan jars as well as directories.
*/
public class ClasspathScanner
{
- static final int SUFFIX_LENGTH = ".class".length();
+ private static final Logger log = Logger.getLogger(ClasspathScanner.class);
/**
* Scans the classpath and returns all classes that extend a specified class and match a specified name.
@@ -49,58 +55,124 @@ public class ClasspathScanner
* that have a default constructor).
*
* @param matchingClass The class or interface to match.
- * @param matchingRegexp The reular expression to match against the class name.
+ * @param matchingRegexp The regular expression to match against the class name.
* @param beanOnly Flag to indicate that onyl classes with default constructors should be matched.
*
* @return All the classes that match this collector.
*/
- public static Collection<Class<?>> getMatches(Class<?> matchingClass, String matchingRegexp, boolean beanOnly)
+ public static <T> Collection<Class<? extends T>> getMatches(Class<T> matchingClass, String matchingRegexp,
+ boolean beanOnly)
{
+ // Build a compiled regular expression from the pattern to match.
+ Pattern matchPattern = Pattern.compile(matchingRegexp);
+
String classPath = System.getProperty("java.class.path");
- Map result = new HashMap();
+ Map<String, Class<? extends T>> result = new HashMap<String, Class<? extends T>>();
+ // Find matching classes starting from all roots in the classpath.
for (String path : splitClassPath(classPath))
{
- gatherFiles(new File(path), "", result);
+ gatherFiles(new File(path), "", result, matchPattern, matchingClass);
}
return result.values();
}
- private static void gatherFiles(File classRoot, String classFileName, Map result)
+ /**
+ * Finds all matching classes rooted at a given location in the file system. If location is a directory it
+ * is recursively examined.
+ *
+ * @param classRoot The root of the current point in the file system being examined.
+ * @param classFileName The name of the current file or directory to examine.
+ * @param result The accumulated mapping from class names to classes that match the scan.
+ *
+ * @todo Recursion ok as file system depth is not likely to exhaust the stack. Might be better to replace with
+ * iteration.
+ */
+ private static <T> void gatherFiles(File classRoot, String classFileName, Map<String, Class<? extends T>> result,
+ Pattern matchPattern, Class<? extends T> matchClass)
{
File thisRoot = new File(classRoot, classFileName);
+ // If the current location is a file, check if it is a matching class.
if (thisRoot.isFile())
{
- if (matchesName(classFileName))
+ // Check that the file has a matching name.
+ if (matchesName(classFileName, matchPattern))
{
String className = classNameFromFile(classFileName);
- result.put(className, className);
+
+ // Check that the class has matching type.
+ try
+ {
+ Class<?> candidateClass = Class.forName(className);
+
+ Class matchedClass = matchesClass(candidateClass, matchClass);
+
+ if (matchedClass != null)
+ {
+ result.put(className, matchedClass);
+ }
+ }
+ catch (ClassNotFoundException e)
+ {
+ // Ignore this. The matching class could not be loaded.
+ log.debug("Got ClassNotFoundException, ignoring.", e);
+ }
}
return;
}
-
- String[] contents = thisRoot.list();
-
- if (contents != null)
+ // Otherwise the current location is a directory, so examine all of its contents.
+ else
{
- for (String content : contents)
+ String[] contents = thisRoot.list();
+
+ if (contents != null)
{
- gatherFiles(classRoot, classFileName + File.separatorChar + content, result);
+ for (String content : contents)
+ {
+ gatherFiles(classRoot, classFileName + File.separatorChar + content, result, matchPattern, matchClass);
+ }
}
}
}
- private static boolean matchesName(String classFileName)
+ /**
+ * Checks if the specified class file name corresponds to a class with name matching the specified regular expression.
+ *
+ * @param classFileName The class file name.
+ * @param matchPattern The regular expression pattern to match.
+ *
+ * @return <tt>true</tt> if the class name matches, <tt>false</tt> otherwise.
+ */
+ private static boolean matchesName(String classFileName, Pattern matchPattern)
{
- return classFileName.endsWith(".class") && (classFileName.indexOf('$') < 0) && (classFileName.indexOf("Test") > 0);
+ String className = classNameFromFile(classFileName);
+ Matcher matcher = matchPattern.matcher(className);
+
+ return matcher.matches();
}
- private static boolean matchesInterface()
+ /**
+ * Checks if the specified class to compare extends the base class being scanned for.
+ *
+ * @param matchingClass The base class to match against.
+ * @param toMatch The class to match against the base class.
+ *
+ * @return The class to check, cast as an instance of the class to match if the class extends the base class, or
+ * <tt>null</tt> otherwise.
+ */
+ private static <T> Class<? extends T> matchesClass(Class<?> matchingClass, Class<? extends T> toMatch)
{
- return false;
+ try
+ {
+ return matchingClass.asSubclass(toMatch);
+ }
+ catch (ClassCastException e)
+ {
+ return null;
+ }
}
/**
@@ -125,17 +197,22 @@ public class ClasspathScanner
}
/**
- * convert /a/b.class to a.b
+ * Translates from the filename of a class to its fully qualified classname. Files are named using forward slash
+ * seperators and end in ".class", whereas fully qualified class names use "." sperators and no ".class" ending.
*
- * @param classFileName
+ * @param classFileName The filename of the class to translate to a class name.
*
- * @return
+ * @return The fully qualified class name.
*/
private static String classNameFromFile(String classFileName)
{
+ // Remove the .class ending.
+ String s = classFileName.substring(0, classFileName.length() - ".class".length());
- String s = classFileName.substring(0, classFileName.length() - SUFFIX_LENGTH);
+ // Turn / seperators in . seperators.
String s2 = s.replace(File.separatorChar, '.');
+
+ // Knock off any leading . caused by a leading /.
if (s2.startsWith("."))
{
return s2.substring(1);
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/util/ConversationHelper.java b/java/integrationtests/src/main/java/org/apache/qpid/util/ConversationHelper.java
index bda089045a..631cab9f35 100644
--- a/java/integrationtests/src/main/java/org/apache/qpid/util/ConversationHelper.java
+++ b/java/integrationtests/src/main/java/org/apache/qpid/util/ConversationHelper.java
@@ -20,13 +20,13 @@
*/
package org.apache.qpid.util;
-import java.util.Collection;
+import java.util.*;
import java.util.Queue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.Message;
-import javax.jms.MessageListener;
+import javax.jms.*;
/**
* A conversation helper, uses a message correlation id pattern to match up sent and received messages as a conversation
@@ -89,7 +89,8 @@ import javax.jms.MessageListener;
* transactional mode, commits must happen before receiving, or no replies will come in. (unless there were some
* pending on the queue?). Also, having received on a particular session, must ensure that session is used for all
* subsequent sends and receive at least until the transaction is committed. So a message selector must be used
- * to restrict receives on that session to prevent it picking up messages bound for other conversations.
+ * to restrict receives on that session to prevent it picking up messages bound for other conversations. Or use
+ * a temporary response queue, with only that session listening to it.
*
* @todo Want something convenient that hides many details. Write out some example use cases to get the best feel for
* it. Pass in connection, send destination, receive destination. Provide endConvo, send, receive
@@ -101,6 +102,32 @@ import javax.jms.MessageListener;
*/
public class ConversationHelper
{
+ /** Holds a map from correlation id's to queues. */
+ private Map<Long, BlockingQueue<Message>> idsToQueues = new HashMap<Long, BlockingQueue<Message>>();
+
+ private Session session;
+ private MessageProducer producer;
+ private MessageConsumer consumer;
+
+ Class<? extends BlockingQueue<Message>> queueClass;
+
+ BlockingQueue<Message> deadLetterBox = new LinkedBlockingQueue<Message>();
+
+ ThreadLocal<PerThreadSettings> threadLocals =
+ new ThreadLocal<PerThreadSettings>()
+ {
+ protected PerThreadSettings initialValue()
+ {
+ PerThreadSettings settings = new PerThreadSettings();
+ settings.conversationId = conversationIdGenerator.getAndIncrement();
+
+ return settings;
+ }
+ };
+
+ /** Generates new coversation id's as needed. */
+ AtomicLong conversationIdGenerator = new AtomicLong();
+
/**
* Creates a conversation helper on the specified connection with the default sending destination, and listening
* to the specified receiving destination.
@@ -109,19 +136,53 @@ public class ConversationHelper
* @param sendDestination The default sending destiation for all messages.
* @param receiveDestination The destination to listen to for incoming messages.
* @param queueClass The queue implementation class.
+ *
+ * @throws JMSException All undelying JMSExceptions are allowed to fall through.
*/
public ConversationHelper(Connection connection, Destination sendDestination, Destination receiveDestination,
- Class<? extends Queue> queueClass)
- { }
+ Class<? extends BlockingQueue<Message>> queueClass) throws JMSException
+ {
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ producer = session.createProducer(sendDestination);
+ consumer = session.createConsumer(receiveDestination);
+
+ consumer.setMessageListener(new Receiver());
+
+ this.queueClass = queueClass;
+ }
/**
* Sends a message to the default sending location. The correlation id of the message will be assigned by this
* method, overriding any previously set value.
*
* @param message The message to send.
+ *
+ * @throws JMSException All undelying JMSExceptions are allowed to fall through.
+ */
+ public void send(Message message) throws JMSException
+ {
+ PerThreadSettings settings = threadLocals.get();
+ long conversationId = settings.conversationId;
+ message.setJMSCorrelationID(Long.toString(conversationId));
+
+ // Ensure that the reply queue for this conversation exists.
+ initQueueForId(conversationId);
+
+ producer.send(message);
+ }
+
+ /**
+ * Ensures that the reply queue for a conversation exists.
+ *
+ * @param conversationId The conversation correlation id.
*/
- public void send(Message message)
- { }
+ private void initQueueForId(long conversationId)
+ {
+ if (!idsToQueues.containsKey(conversationId))
+ {
+ idsToQueues.put(conversationId, ReflectionUtils.<BlockingQueue<Message>>newInstance(queueClass));
+ }
+ }
/**
* Gets the next message in an ongoing conversation. This method may block until such a message is received.
@@ -130,7 +191,22 @@ public class ConversationHelper
*/
public Message receive()
{
- return null;
+ PerThreadSettings settings = threadLocals.get();
+ long conversationId = settings.conversationId;
+
+ // Ensure that the reply queue for this conversation exists.
+ initQueueForId(conversationId);
+
+ BlockingQueue<Message> queue = idsToQueues.get(conversationId);
+
+ try
+ {
+ return queue.take();
+ }
+ catch (InterruptedException e)
+ {
+ return null;
+ }
}
/**
@@ -138,7 +214,18 @@ public class ConversationHelper
* conversation are no longer valid, and any incoming messages using them will go to the dead letter box.
*/
public void end()
- { }
+ {
+ // Ensure that the thread local for the current thread is cleaned up.
+ PerThreadSettings settings = threadLocals.get();
+ long conversationId = settings.conversationId;
+ threadLocals.remove();
+
+ // Ensure that its queue is removed from the queue map.
+ BlockingQueue<Message> queue = idsToQueues.remove(conversationId);
+
+ // Move any outstanding messages on the threads conversation id into the dead letter box.
+ queue.drainTo(deadLetterBox);
+ }
/**
* Clears the dead letter box, returning all messages that were in it.
@@ -147,7 +234,10 @@ public class ConversationHelper
*/
public Collection<Message> emptyDeadLetterBox()
{
- return null;
+ Collection<Message> result = new LinkedList<Message>();
+ deadLetterBox.drainTo(result);
+
+ return result;
}
/**
@@ -162,6 +252,32 @@ public class ConversationHelper
* @param message The incoming message.
*/
public void onMessage(Message message)
- { }
+ {
+ try
+ {
+ Long conversationId = Long.parseLong(message.getJMSCorrelationID());
+
+ // Find the converstaion queue to place the message on. If there is no conversation for the message id,
+ // the the dead letter box queue is used.
+ BlockingQueue<Message> queue = idsToQueues.get(conversationId);
+ queue = (queue == null) ? deadLetterBox : queue;
+
+ queue.put(message);
+ }
+ catch (JMSException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ protected class PerThreadSettings
+ {
+ /** Holds the correlation id for the current threads conversation. */
+ long conversationId;
}
}