/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ package org.apache.qpid.test.unit.close; import junit.framework.Assert; import org.apache.qpid.test.utils.QpidBrokerTestCase; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.qpid.junit.concurrency.TestRunnable; import org.apache.qpid.junit.concurrency.ThreadTestCoordinator; import javax.jms.Connection; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.Session; /** * This test forces the situation where a session is closed whilst a message consumer is still in its onMessage method. * Running in AUTO_ACK mode, the close call ought to wait until the onMessage method completes, and the ack is sent * before closing the connection. * *

CRC Card
Responsibilities Collaborations
Check that * closing a connection whilst handling a message, blocks till completion of the handler.
*/ public class CloseBeforeAckTest extends QpidBrokerTestCase { private static final Logger log = LoggerFactory.getLogger(CloseBeforeAckTest.class); Connection connection; Session session; public static final String TEST_QUEUE_NAME = "TestQueue"; private int TEST_COUNT = 25; class TestThread1 extends TestRunnable implements MessageListener { public void runWithExceptions() throws Exception { // Set this up to listen for message on the test session. session.createConsumer(session.createQueue(TEST_QUEUE_NAME)).setMessageListener(this); } public void onMessage(Message message) { // Give thread 2 permission to close the session. allow(new int[] { 1 }); // Wait until thread 2 has closed the connection, or is blocked waiting for this to complete. waitFor(new int[] { 1 }, true); } } TestThread1 testThread1 = new TestThread1(); TestRunnable testThread2 = new TestRunnable() { public void runWithExceptions() throws Exception { // Send a message to be picked up by thread 1. session.createProducer(null).send(session.createQueue(TEST_QUEUE_NAME), session.createTextMessage("Hi there thread 1!")); // Wait for thread 1 to pick up the message and give permission to continue. waitFor(new int[] { 0 }, false); // Close the connection. session.close(); // Allow thread 1 to continue to completion, if it is erronously still waiting. allow(new int[] { 1 }); } }; public void testCloseBeforeAutoAck_QPID_397() throws Exception { // Create a session in auto acknowledge mode. This problem shows up in auto acknowledge if the client acks // message at the end of the onMessage method, after a close has been sent. session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); ThreadTestCoordinator tt = new ThreadTestCoordinator(2); tt.addTestThread(testThread1, 0); tt.addTestThread(testThread2, 1); tt.setDeadlockTimeout(500); tt.run(); String errorMessage = tt.joinAndRetrieveMessages(); // Print any error messages or exceptions. log.debug(errorMessage); if (!tt.getExceptions().isEmpty()) { for (Exception e : tt.getExceptions()) { log.debug("Exception thrown during test thread: ", e); } } Assert.assertTrue(errorMessage, "".equals(errorMessage)); } public void closeBeforeAutoAckManyTimes() throws Exception { for (int i = 0; i < TEST_COUNT; i++) { testCloseBeforeAutoAck_QPID_397(); } } protected void setUp() throws Exception { super.setUp(); connection = getConnection("guest", "guest"); } protected void tearDown() throws Exception { super.tearDown(); } }