/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ package org.apache.qpid.test.client.message; import java.util.concurrent.atomic.AtomicReference; import javax.jms.Connection; import javax.jms.Destination; import javax.jms.IllegalStateException; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TemporaryQueue; import org.apache.qpid.test.utils.QpidBrokerTestCase; /** * Tests that {@link Message#setJMSReplyTo(Destination)} can be used to pass a {@link Destination} between * messaging clients as is commonly used in request/response messaging pattern implementations. */ public class JMSReplyToTest extends QpidBrokerTestCase { private AtomicReference _caughtException = new AtomicReference(); private Queue _requestQueue; private Connection _connection; private Session _session; @Override protected void setUp() throws Exception { super.setUp(); _requestQueue = startAsyncRespondingJmsConsumerOnSeparateConnection(); _connection = getConnection(); _connection.start(); _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); } public void testRequestResponseUsingJmsReplyTo() throws Exception { final String responseQueueName = getTestQueueName() + ".response"; Queue replyToQueue = _session.createQueue(responseQueueName); sendRequestAndValidateResponse(replyToQueue); } public void testRequestResponseUsingTemporaryJmsReplyTo() throws Exception { TemporaryQueue replyToQueue = _session.createTemporaryQueue(); sendRequestAndValidateResponse(replyToQueue); } private void sendRequestAndValidateResponse(Queue replyToQueue) throws JMSException, Exception { MessageConsumer replyConsumer = _session.createConsumer(replyToQueue); Message requestMessage = createRequestMessageWithJmsReplyTo(_session, replyToQueue); sendRequest(_requestQueue, _session, requestMessage); receiveAndValidateResponse(replyConsumer, requestMessage); assertNull("Async responder caught unexpected exception", _caughtException.get()); } private Message createRequestMessageWithJmsReplyTo(Session session, Queue replyToQueue) throws JMSException { Message requestMessage = session.createTextMessage("My request"); requestMessage.setJMSReplyTo(replyToQueue); return requestMessage; } private void sendRequest(final Queue requestQueue, Session session, Message requestMessage) throws Exception { MessageProducer producer = session.createProducer(requestQueue); producer.send(requestMessage); } private void receiveAndValidateResponse(MessageConsumer replyConsumer, Message requestMessage) throws JMSException { Message responseMessage = replyConsumer.receive(RECEIVE_TIMEOUT); assertNotNull("Response message not received", responseMessage); assertEquals("Correlation id of the response should match message id of the request", responseMessage.getJMSCorrelationID(), requestMessage.getJMSMessageID()); } private Queue startAsyncRespondingJmsConsumerOnSeparateConnection() throws Exception { final String requestQueueName = getTestQueueName() + ".request"; final Connection responderConnection = getConnection(); responderConnection.start(); final Session responderSession = responderConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); final Queue requestQueue = responderSession.createQueue(requestQueueName); final MessageConsumer requestConsumer = responderSession.createConsumer(requestQueue); requestConsumer.setMessageListener(new AsyncResponder(responderSession)); return requestQueue; } private final class AsyncResponder implements MessageListener { private final Session _responderSession; private AsyncResponder(Session responderSession) { _responderSession = responderSession; } @Override public void onMessage(Message requestMessage) { try { Destination replyTo = getReplyToQueue(requestMessage); Message responseMessage = _responderSession.createMessage(); responseMessage.setJMSCorrelationID(requestMessage.getJMSMessageID()); sendResponseToQueue(replyTo, responseMessage); } catch (Throwable t) { _caughtException.set(t); } } private Destination getReplyToQueue(Message requestMessage) throws JMSException, IllegalStateException { Destination replyTo = requestMessage.getJMSReplyTo(); if (replyTo == null) { throw new IllegalStateException("JMSReplyTo was null on message " + requestMessage); } return replyTo; } private void sendResponseToQueue(Destination replyTo, Message responseMessage) throws JMSException { MessageProducer responseProducer = _responderSession.createProducer(replyTo); responseProducer.send(responseMessage); } } }