diff options
Diffstat (limited to 'qpid/java/systests/src/main/java/org/apache/qpid/test/client/message')
5 files changed, 186 insertions, 22 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/JMSDestinationTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/JMSDestinationTest.java index 2d326d73b8..5b350d2d89 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/JMSDestinationTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/JMSDestinationTest.java @@ -20,11 +20,11 @@ */ package org.apache.qpid.test.client.message; -import org.apache.qpid.configuration.ClientProperties; import org.apache.qpid.client.AMQAnyDestination; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQTopic; import org.apache.qpid.client.CustomJMSXProperty; +import org.apache.qpid.configuration.ClientProperties; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.management.common.mbeans.ManagedQueue; import org.apache.qpid.test.utils.JMXTestUtils; diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/JMSReplyToTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/JMSReplyToTest.java new file mode 100644 index 0000000000..fe8180d6c6 --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/JMSReplyToTest.java @@ -0,0 +1,169 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.client.message; + +import java.util.concurrent.atomic.AtomicReference; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.IllegalStateException; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TemporaryQueue; + +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +/** + * Tests that {@link Message#setJMSReplyTo(Destination)} can be used to pass a {@link Destination} between + * messaging clients as is commonly used in request/response messaging pattern implementations. + */ +public class JMSReplyToTest extends QpidBrokerTestCase +{ + private AtomicReference<Throwable> _caughtException = new AtomicReference<Throwable>(); + private Queue _requestQueue; + private Connection _connection; + private Session _session; + + @Override + protected void setUp() throws Exception + { + super.setUp(); + + _requestQueue = startAsyncRespondingJmsConsumerOnSeparateConnection(); + + _connection = getConnection(); + _connection.start(); + _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + } + + public void testRequestResponseUsingJmsReplyTo() throws Exception + { + final String responseQueueName = getTestQueueName() + ".response"; + Queue replyToQueue = _session.createQueue(responseQueueName); + sendRequestAndValidateResponse(replyToQueue); + } + + public void testRequestResponseUsingTemporaryJmsReplyTo() throws Exception + { + TemporaryQueue replyToQueue = _session.createTemporaryQueue(); + + sendRequestAndValidateResponse(replyToQueue); + } + + private void sendRequestAndValidateResponse(Queue replyToQueue) throws JMSException, Exception + { + MessageConsumer replyConsumer = _session.createConsumer(replyToQueue); + + Message requestMessage = createRequestMessageWithJmsReplyTo(_session, replyToQueue); + sendRequest(_requestQueue, _session, requestMessage); + + receiveAndValidateResponse(replyConsumer, requestMessage); + + assertNull("Async responder caught unexpected exception", _caughtException.get()); + } + + private Message createRequestMessageWithJmsReplyTo(Session session, Queue replyToQueue) + throws JMSException + { + Message requestMessage = session.createTextMessage("My request"); + requestMessage.setJMSReplyTo(replyToQueue); + return requestMessage; + } + + private void sendRequest(final Queue requestQueue, Session session, Message requestMessage) throws Exception + { + MessageProducer producer = session.createProducer(requestQueue); + producer.send(requestMessage); + } + + private void receiveAndValidateResponse(MessageConsumer replyConsumer, Message requestMessage) throws JMSException + { + Message responseMessage = replyConsumer.receive(RECEIVE_TIMEOUT); + assertNotNull("Response message not received", responseMessage); + assertEquals("Correlation id of the response should match message id of the request", + responseMessage.getJMSCorrelationID(), requestMessage.getJMSMessageID()); + } + + private Queue startAsyncRespondingJmsConsumerOnSeparateConnection() throws Exception + { + final String requestQueueName = getTestQueueName() + ".request"; + final Connection responderConnection = getConnection(); + responderConnection.start(); + final Session responderSession = responderConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + final Queue requestQueue = responderSession.createQueue(requestQueueName); + + final MessageConsumer requestConsumer = responderSession.createConsumer(requestQueue); + requestConsumer.setMessageListener(new AsyncResponder(responderSession)); + + return requestQueue; + } + + private final class AsyncResponder implements MessageListener + { + private final Session _responderSession; + + private AsyncResponder(Session responderSession) + { + _responderSession = responderSession; + } + + @Override + public void onMessage(Message requestMessage) + { + try + { + Destination replyTo = getReplyToQueue(requestMessage); + + Message responseMessage = _responderSession.createMessage(); + responseMessage.setJMSCorrelationID(requestMessage.getJMSMessageID()); + + sendResponseToQueue(replyTo, responseMessage); + } + catch (Throwable t) + { + _caughtException.set(t); + } + } + + private Destination getReplyToQueue(Message requestMessage) throws JMSException, IllegalStateException + { + Destination replyTo = requestMessage.getJMSReplyTo(); + if (replyTo == null) + { + throw new IllegalStateException("JMSReplyTo was null on message " + requestMessage); + } + return replyTo; + } + + private void sendResponseToQueue(Destination replyTo, Message responseMessage) + throws JMSException + { + MessageProducer responseProducer = _responderSession.createProducer(replyTo); + responseProducer.send(responseMessage); + } + } + +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/MessageToStringTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/MessageToStringTest.java index 1071861d47..dc1f690b1e 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/MessageToStringTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/MessageToStringTest.java @@ -20,14 +20,12 @@ */ package org.apache.qpid.test.client.message; -import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQSession; -import org.apache.qpid.test.utils.QpidBrokerTestCase; import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.test.utils.QpidBrokerTestCase; import javax.jms.BytesMessage; import javax.jms.Connection; -import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Message; @@ -49,7 +47,7 @@ public class MessageToStringTest extends QpidBrokerTestCase private Connection _connection; private Session _session; private Queue _queue; - MessageConsumer _consumer; + private MessageConsumer _consumer; private static final String BYTE_TEST = "MapByteTest"; public void setUp() throws Exception diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/ObjectMessageTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/ObjectMessageTest.java index fa16152b69..3bd2c4a44e 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/ObjectMessageTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/ObjectMessageTest.java @@ -20,10 +20,9 @@ */ package org.apache.qpid.test.client.message; -import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQSession; -import org.apache.qpid.test.utils.QpidBrokerTestCase; import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.test.utils.QpidBrokerTestCase; import javax.jms.Connection; import javax.jms.JMSException; @@ -38,8 +37,8 @@ public class ObjectMessageTest extends QpidBrokerTestCase { private Connection _connection; private Session _session; - MessageConsumer _consumer; - MessageProducer _producer; + private MessageConsumer _consumer; + private MessageProducer _producer; public void setUp() throws Exception { diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java index b1c8b5682f..2c7f426306 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java @@ -20,7 +20,16 @@ */ package org.apache.qpid.test.client.message; -import java.util.concurrent.CountDownLatch; +import junit.framework.Assert; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.BasicMessageProducer; +import org.apache.qpid.test.utils.QpidBrokerTestCase; import javax.jms.DeliveryMode; import javax.jms.Destination; @@ -32,18 +41,7 @@ import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; - -import junit.framework.Assert; - -import org.apache.qpid.AMQException; -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQDestination; -import org.apache.qpid.client.AMQQueue; -import org.apache.qpid.client.AMQSession; -import org.apache.qpid.client.BasicMessageProducer; -import org.apache.qpid.test.utils.QpidBrokerTestCase; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.util.concurrent.CountDownLatch; public class SelectorTest extends QpidBrokerTestCase implements MessageListener { |