summaryrefslogtreecommitdiff
path: root/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/systests/src/main/java/org/apache/qpid/test/client/message')
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/JMSDestinationTest.java2
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/JMSReplyToTest.java169
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/MessageToStringTest.java6
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/ObjectMessageTest.java7
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java24
5 files changed, 186 insertions, 22 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/JMSDestinationTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/JMSDestinationTest.java
index 2d326d73b8..5b350d2d89 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/JMSDestinationTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/JMSDestinationTest.java
@@ -20,11 +20,11 @@
*/
package org.apache.qpid.test.client.message;
-import org.apache.qpid.configuration.ClientProperties;
import org.apache.qpid.client.AMQAnyDestination;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQTopic;
import org.apache.qpid.client.CustomJMSXProperty;
+import org.apache.qpid.configuration.ClientProperties;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.management.common.mbeans.ManagedQueue;
import org.apache.qpid.test.utils.JMXTestUtils;
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/JMSReplyToTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/JMSReplyToTest.java
new file mode 100644
index 0000000000..fe8180d6c6
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/JMSReplyToTest.java
@@ -0,0 +1,169 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.client.message;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TemporaryQueue;
+
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+/**
+ * Tests that {@link Message#setJMSReplyTo(Destination)} can be used to pass a {@link Destination} between
+ * messaging clients as is commonly used in request/response messaging pattern implementations.
+ */
+public class JMSReplyToTest extends QpidBrokerTestCase
+{
+ private AtomicReference<Throwable> _caughtException = new AtomicReference<Throwable>();
+ private Queue _requestQueue;
+ private Connection _connection;
+ private Session _session;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ _requestQueue = startAsyncRespondingJmsConsumerOnSeparateConnection();
+
+ _connection = getConnection();
+ _connection.start();
+ _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ }
+
+ public void testRequestResponseUsingJmsReplyTo() throws Exception
+ {
+ final String responseQueueName = getTestQueueName() + ".response";
+ Queue replyToQueue = _session.createQueue(responseQueueName);
+ sendRequestAndValidateResponse(replyToQueue);
+ }
+
+ public void testRequestResponseUsingTemporaryJmsReplyTo() throws Exception
+ {
+ TemporaryQueue replyToQueue = _session.createTemporaryQueue();
+
+ sendRequestAndValidateResponse(replyToQueue);
+ }
+
+ private void sendRequestAndValidateResponse(Queue replyToQueue) throws JMSException, Exception
+ {
+ MessageConsumer replyConsumer = _session.createConsumer(replyToQueue);
+
+ Message requestMessage = createRequestMessageWithJmsReplyTo(_session, replyToQueue);
+ sendRequest(_requestQueue, _session, requestMessage);
+
+ receiveAndValidateResponse(replyConsumer, requestMessage);
+
+ assertNull("Async responder caught unexpected exception", _caughtException.get());
+ }
+
+ private Message createRequestMessageWithJmsReplyTo(Session session, Queue replyToQueue)
+ throws JMSException
+ {
+ Message requestMessage = session.createTextMessage("My request");
+ requestMessage.setJMSReplyTo(replyToQueue);
+ return requestMessage;
+ }
+
+ private void sendRequest(final Queue requestQueue, Session session, Message requestMessage) throws Exception
+ {
+ MessageProducer producer = session.createProducer(requestQueue);
+ producer.send(requestMessage);
+ }
+
+ private void receiveAndValidateResponse(MessageConsumer replyConsumer, Message requestMessage) throws JMSException
+ {
+ Message responseMessage = replyConsumer.receive(RECEIVE_TIMEOUT);
+ assertNotNull("Response message not received", responseMessage);
+ assertEquals("Correlation id of the response should match message id of the request",
+ responseMessage.getJMSCorrelationID(), requestMessage.getJMSMessageID());
+ }
+
+ private Queue startAsyncRespondingJmsConsumerOnSeparateConnection() throws Exception
+ {
+ final String requestQueueName = getTestQueueName() + ".request";
+ final Connection responderConnection = getConnection();
+ responderConnection.start();
+ final Session responderSession = responderConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final Queue requestQueue = responderSession.createQueue(requestQueueName);
+
+ final MessageConsumer requestConsumer = responderSession.createConsumer(requestQueue);
+ requestConsumer.setMessageListener(new AsyncResponder(responderSession));
+
+ return requestQueue;
+ }
+
+ private final class AsyncResponder implements MessageListener
+ {
+ private final Session _responderSession;
+
+ private AsyncResponder(Session responderSession)
+ {
+ _responderSession = responderSession;
+ }
+
+ @Override
+ public void onMessage(Message requestMessage)
+ {
+ try
+ {
+ Destination replyTo = getReplyToQueue(requestMessage);
+
+ Message responseMessage = _responderSession.createMessage();
+ responseMessage.setJMSCorrelationID(requestMessage.getJMSMessageID());
+
+ sendResponseToQueue(replyTo, responseMessage);
+ }
+ catch (Throwable t)
+ {
+ _caughtException.set(t);
+ }
+ }
+
+ private Destination getReplyToQueue(Message requestMessage) throws JMSException, IllegalStateException
+ {
+ Destination replyTo = requestMessage.getJMSReplyTo();
+ if (replyTo == null)
+ {
+ throw new IllegalStateException("JMSReplyTo was null on message " + requestMessage);
+ }
+ return replyTo;
+ }
+
+ private void sendResponseToQueue(Destination replyTo, Message responseMessage)
+ throws JMSException
+ {
+ MessageProducer responseProducer = _responderSession.createProducer(replyTo);
+ responseProducer.send(responseMessage);
+ }
+ }
+
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/MessageToStringTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/MessageToStringTest.java
index 1071861d47..dc1f690b1e 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/MessageToStringTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/MessageToStringTest.java
@@ -20,14 +20,12 @@
*/
package org.apache.qpid.test.client.message;
-import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
import javax.jms.BytesMessage;
import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
@@ -49,7 +47,7 @@ public class MessageToStringTest extends QpidBrokerTestCase
private Connection _connection;
private Session _session;
private Queue _queue;
- MessageConsumer _consumer;
+ private MessageConsumer _consumer;
private static final String BYTE_TEST = "MapByteTest";
public void setUp() throws Exception
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/ObjectMessageTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/ObjectMessageTest.java
index fa16152b69..3bd2c4a44e 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/ObjectMessageTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/ObjectMessageTest.java
@@ -20,10 +20,9 @@
*/
package org.apache.qpid.test.client.message;
-import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
import javax.jms.Connection;
import javax.jms.JMSException;
@@ -38,8 +37,8 @@ public class ObjectMessageTest extends QpidBrokerTestCase
{
private Connection _connection;
private Session _session;
- MessageConsumer _consumer;
- MessageProducer _producer;
+ private MessageConsumer _consumer;
+ private MessageProducer _producer;
public void setUp() throws Exception
{
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java
index b1c8b5682f..2c7f426306 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java
@@ -20,7 +20,16 @@
*/
package org.apache.qpid.test.client.message;
-import java.util.concurrent.CountDownLatch;
+import junit.framework.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.BasicMessageProducer;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
@@ -32,18 +41,7 @@ import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
-
-import junit.framework.Assert;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.client.BasicMessageProducer;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.util.concurrent.CountDownLatch;
public class SelectorTest extends QpidBrokerTestCase implements MessageListener
{