From 32e3df31c2dacb8af8af198c82881f42fee285dc Mon Sep 17 00:00:00 2001 From: Robert Gemmell Date: Fri, 6 Apr 2012 10:51:21 +0000 Subject: QPID-3911: Fix deadlock on concurrent invocation of MessageConsumer#close() and Session#rollback() from consumer MessageListener This patch contains the following changes: - Add synchronization on AMSession#_messageDeliveryLock into MessageConsumer#close() in order to block until message listener in progress has completed(as required in JMS javadoc for MessageConsumer#close()). - Change the session dispatcher to stop messages delivery into consumer local message queue if the consumer in the process of closing. This eliminates the need to stop the dispatcher on rejecting pending messages for closing consumer. - Remove the synchronization on the dispatcher lock from AMQSession.Dispatcher#rejectPending and code to stop the dispatcher, as we are synchronizing on the deliveryLock now and incoming messages are not dispatched into closing consumers anymore. - Add a system test to reproduce the deadlock and verify its resolution. Applied patch from Oleksandr Rudyy git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1310275 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/client/AMQSession.java | 27 +++------- .../apache/qpid/client/BasicMessageConsumer.java | 5 +- .../test/unit/close/MessageConsumerCloseTest.java | 57 ++++++++++++++++++++++ 3 files changed, 68 insertions(+), 21 deletions(-) create mode 100644 qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/MessageConsumerCloseTest.java diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index ce624cb91b..55d3ccb6e7 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -3212,28 +3212,15 @@ public abstract class AMQSession extends Closeable implements Messa // no point otherwise as the connection will be gone if (!_session.isClosed() || _session.isClosing()) { - sendCancel(); + synchronized(_session.getMessageDeliveryLock()) + { + sendCancel(); + } cleanupQueue(); } } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/MessageConsumerCloseTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/MessageConsumerCloseTest.java new file mode 100644 index 0000000000..907290933a --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/MessageConsumerCloseTest.java @@ -0,0 +1,57 @@ +package org.apache.qpid.test.unit.close; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.Session; + +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +public class MessageConsumerCloseTest extends QpidBrokerTestCase +{ + Exception _exception; + + public void testConsumerCloseAndSessionRollback() throws Exception + { + Connection connection = getConnection(); + final CountDownLatch receiveLatch = new CountDownLatch(1); + final Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Destination destination = getTestQueue(); + MessageConsumer consumer = session.createConsumer(destination); + sendMessage(session, destination, 2); + connection.start(); + consumer.setMessageListener(new MessageListener() + { + @Override + public void onMessage(Message message) + { + try + { + receiveLatch.countDown(); + session.rollback(); + } + catch (JMSException e) + { + _exception = e; + } + } + }); + boolean messageReceived = receiveLatch.await(1l, TimeUnit.SECONDS); + consumer.close(); + + assertNull("Exception occured on rollback:" + _exception, _exception); + assertTrue("Message is not received", messageReceived); + + consumer = session.createConsumer(destination); + Message message1 = consumer.receive(1000l); + assertNotNull("message1 is not received", message1); + Message message2 = consumer.receive(1000l); + assertNotNull("message2 is not received", message2); + } +} -- cgit v1.2.1