diff options
3 files changed, 68 insertions, 21 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index ce624cb91b..55d3ccb6e7 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -3212,28 +3212,15 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic public void rejectPending(C consumer) { - synchronized (_lock) - { - boolean stopped = connectionStopped(); + // Reject messages on pre-receive queue + consumer.rollbackPendingMessages(); - if (!stopped) - { - setConnectionStopped(true); - } + // Reject messages on pre-dispatch queue + rejectMessagesForConsumerTag(consumer.getConsumerTag(), true, false); - // Reject messages on pre-receive queue - consumer.rollbackPendingMessages(); + // closeConsumer + consumer.markClosed(); - // Reject messages on pre-dispatch queue - rejectMessagesForConsumerTag(consumer.getConsumerTag(), true, false); - //Let the dispatcher deal with this when it gets to them. - - // closeConsumer - consumer.markClosed(); - - setConnectionStopped(stopped); - - } } public void rollback() @@ -3425,7 +3412,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { final C consumer = _consumers.get(message.getConsumerTag()); - if ((consumer == null) || consumer.isClosed()) + if ((consumer == null) || consumer.isClosed() || consumer.isClosing()) { if (_dispatcherLogger.isInfoEnabled()) { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 0d717a3216..0f8b5717d6 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -593,7 +593,10 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa // no point otherwise as the connection will be gone if (!_session.isClosed() || _session.isClosing()) { - sendCancel(); + synchronized(_session.getMessageDeliveryLock()) + { + sendCancel(); + } cleanupQueue(); } } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/MessageConsumerCloseTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/MessageConsumerCloseTest.java new file mode 100644 index 0000000000..907290933a --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/MessageConsumerCloseTest.java @@ -0,0 +1,57 @@ +package org.apache.qpid.test.unit.close; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.Session; + +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +public class MessageConsumerCloseTest extends QpidBrokerTestCase +{ + Exception _exception; + + public void testConsumerCloseAndSessionRollback() throws Exception + { + Connection connection = getConnection(); + final CountDownLatch receiveLatch = new CountDownLatch(1); + final Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Destination destination = getTestQueue(); + MessageConsumer consumer = session.createConsumer(destination); + sendMessage(session, destination, 2); + connection.start(); + consumer.setMessageListener(new MessageListener() + { + @Override + public void onMessage(Message message) + { + try + { + receiveLatch.countDown(); + session.rollback(); + } + catch (JMSException e) + { + _exception = e; + } + } + }); + boolean messageReceived = receiveLatch.await(1l, TimeUnit.SECONDS); + consumer.close(); + + assertNull("Exception occured on rollback:" + _exception, _exception); + assertTrue("Message is not received", messageReceived); + + consumer = session.createConsumer(destination); + Message message1 = consumer.receive(1000l); + assertNotNull("message1 is not received", message1); + Message message2 = consumer.receive(1000l); + assertNotNull("message2 is not received", message2); + } +} |